Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #121]fix state convert #126

Merged
merged 1 commit into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
msg = validateJob(job, &reviewResponse)
break
case v1beta1.Update:
oldJob, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
_, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
if err != nil {
return ToAdmissionResponse(err)
}
msg = specDeepEqual(job, oldJob, &reviewResponse)
break
default:
err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'")
Expand All @@ -82,6 +81,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
return fmt.Sprintf("'minAvailable' cannot be less than zero.")
}

if job.Spec.MaxRetry < 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("'maxRetry' cannot be less than zero.")
}

if len(job.Spec.Tasks) == 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("No task specified in job spec")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewJobController(config *rest.Config) *Controller {
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addJob,
// TODO: enable this until we find an appropriate way.
// UpdateFunc: cc.updateJob,
UpdateFunc: cc.updateJob,
DeleteFunc: cc.deleteJob,
})
cc.jobLister = cc.jobInformer.Lister()
Expand Down
78 changes: 59 additions & 19 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
// Job version is bumped only when job is killed
job.Status.Version = job.Status.Version + 1
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
Expand Down Expand Up @@ -88,6 +86,10 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
}

job = job.DeepCopy()
//Job version is bumped only when job is killed
job.Status.Version = job.Status.Version + 1

job.Status = vkv1.JobStatus{
State: job.Status.State,

Expand All @@ -112,6 +114,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
Expand All @@ -138,21 +142,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
if update, err := cc.filljob(job); err != nil || update {
return err
}
if newJob != nil {
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
Expand All @@ -168,14 +163,26 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
return err
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}

return nil
}

func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if job.DeletionTimestamp != nil {
Expand Down Expand Up @@ -313,6 +320,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
Expand Down Expand Up @@ -356,10 +365,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
return nil
}

func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) {
// If VolumeClaimName does not exist, generate them for Job.
var newJob *vkv1.Job
volumes := job.Spec.Volumes
update := false
for index, volume := range volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
Expand All @@ -368,7 +378,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return nil, err
return false, nil, err
}
if exist {
continue
Expand All @@ -377,11 +387,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
newJob = job.DeepCopy()
}
newJob.Spec.Volumes[index].VolumeClaimName = vcName
update = true
break
}
}
}
return newJob, nil
return update, newJob, nil
}

func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
Expand Down Expand Up @@ -428,8 +439,9 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
}
pg := &kbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
Namespace: job.Namespace,
Name: job.Name,
Annotations: job.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Expand Down Expand Up @@ -494,3 +506,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {

return minAvailableTasksRes.Convert2K8sResource()
}

func (cc *Controller) filljob(job *vkv1.Job) (bool, error) {
update, newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
return false, err
}
if update {
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return false, err
}
return true, nil
} else if job.Status.State.Phase == "" {
job.Status.State.Phase = vkv1.Pending
if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
} else {
if e := cc.cache.Update(j); e != nil {
glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e)
}
}
return true, nil
}

return false, nil
}
12 changes: 6 additions & 6 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
return
}

if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}

// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) {
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
glog.Infof("Job update event is ignored since no update in 'Spec'.")
return
}

if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}

req := apis.Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,
Expand Down
12 changes: 8 additions & 4 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
for _, task := range job.Spec.Tasks {
if task.Name == req.TaskName {
for _, policy := range task.Policies {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}
}

// 0 is not an error code, is prevented in validation admission controller
Expand All @@ -170,8 +172,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {

// Parse Job level policies
for _, policy := range job.Spec.Policies {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}
}

// 0 is not an error code, is prevented in validation admission controller
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/aborted.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type abortedState struct {
func (as *abortedState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
return SyncJob(as.job, func(status *vkv1.JobStatus) {
return KillJob(as.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Restarting
status.RetryCount++
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/aborting.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
return KillJob(ps.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Restarting
status.RetryCount++
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State {
return &runningState{job: jobInfo}
case vkv1.Restarting:
return &restartingState{job: jobInfo}
case vkv1.Terminated, vkv1.Completed:
case vkv1.Terminated, vkv1.Completed, vkv1.Failed:
return &finishedState{job: jobInfo}
case vkv1.Terminating:
return &terminatingState{job: jobInfo}
Expand All @@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State {
return &abortedState{job: jobInfo}
case vkv1.Completing:
return &completingState{job: jobInfo}
case vkv1.Failed:
return &failedState{job: jobInfo}
case vkv1.Inqueue:
return &inqueueState{job: jobInfo}
}
Expand Down
30 changes: 0 additions & 30 deletions pkg/controllers/job/state/failed.go

This file was deleted.

15 changes: 8 additions & 7 deletions pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type restartingState struct {
}

func (ps *restartingState) Execute(action vkv1.Action) error {
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
return KillJob(ps.job, func(status *vkv1.JobStatus) {
phase := vkv1.Restarting

// Get the maximum number of retries.
Expand All @@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error {
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
phase = vkv1.Failed
} else {
if status.Terminating == 0 {
if status.Running >= ps.job.Job.Spec.MinAvailable {
phase = vkv1.Running
} else {
phase = vkv1.Pending
}
total := int32(0)
for _, task := range ps.job.Job.Spec.Tasks {
total += task.Replicas
}

if total-status.Terminating >= status.MinAvailable {
phase = vkv1.Pending
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Job E2E Test: Test Job Command", func() {
//Job is pending
err := waitJobPending(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitJobStatePending(context, job)
err = waitJobStateInqueue(context, job)
Expect(err).NotTo(HaveOccurred())

//Suspend job and wait status change
Expand Down
Loading