Skip to content

Commit

Permalink
fix state convert
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing (C) committed May 5, 2019
1 parent 40e85f4 commit 74032e2
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 56 deletions.
61 changes: 47 additions & 14 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
if update, err := cc.filljob(job); err != nil || update {
return err
}
if newJob != nil {
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
Expand All @@ -168,6 +159,18 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
return err
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}

return nil
}

Expand Down Expand Up @@ -356,10 +359,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
return nil
}

func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) {
// If VolumeClaimName does not exist, generate them for Job.
var newJob *vkv1.Job
volumes := job.Spec.Volumes
update := false
for index, volume := range volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
Expand All @@ -368,7 +372,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return nil, err
return false, nil, err
}
if exist {
continue
Expand All @@ -377,11 +381,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
newJob = job.DeepCopy()
}
newJob.Spec.Volumes[index].VolumeClaimName = vcName
update = true
break
}
}
}
return newJob, nil
return update, newJob, nil
}

func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
Expand Down Expand Up @@ -494,3 +499,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {

return minAvailableTasksRes.Convert2K8sResource()
}

func (cc *Controller) filljob(job *vkv1.Job) (bool, error) {
update, newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
return false, err
}
if update {
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return false, err
}
return true, nil
} else if job.Status.State.Phase == "" {
job.Status.State.Phase = vkv1.Pending
if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
} else {
if e := cc.cache.Update(j); e != nil {
glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e)
}
}
return true, nil
}

return false, nil
}
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/aborted.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type abortedState struct {
func (as *abortedState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
return SyncJob(as.job, func(status *vkv1.JobStatus) {
return KillJob(as.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Restarting
status.RetryCount++
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/aborting.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
return KillJob(ps.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Restarting
status.RetryCount++
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State {
return &runningState{job: jobInfo}
case vkv1.Restarting:
return &restartingState{job: jobInfo}
case vkv1.Terminated, vkv1.Completed:
case vkv1.Terminated, vkv1.Completed, vkv1.Failed:
return &finishedState{job: jobInfo}
case vkv1.Terminating:
return &terminatingState{job: jobInfo}
Expand All @@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State {
return &abortedState{job: jobInfo}
case vkv1.Completing:
return &completingState{job: jobInfo}
case vkv1.Failed:
return &failedState{job: jobInfo}
case vkv1.Inqueue:
return &inqueueState{job: jobInfo}
}
Expand Down
30 changes: 0 additions & 30 deletions pkg/controllers/job/state/failed.go

This file was deleted.

15 changes: 8 additions & 7 deletions pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type restartingState struct {
}

func (ps *restartingState) Execute(action vkv1.Action) error {
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
return KillJob(ps.job, func(status *vkv1.JobStatus) {
phase := vkv1.Restarting

// Get the maximum number of retries.
Expand All @@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error {
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
phase = vkv1.Failed
} else {
if status.Terminating == 0 {
if status.Running >= ps.job.Job.Spec.MinAvailable {
phase = vkv1.Running
} else {
phase = vkv1.Pending
}
total := int32(0)
for _, task := range ps.job.Job.Spec.Tasks {
total += task.Replicas
}

if total-status.Terminating >= status.MinAvailable {
phase = vkv1.Pending
}
}

Expand Down

0 comments on commit 74032e2

Please sign in to comment.