diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 41c664a4c4..93eea20596 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -144,10 +144,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st } } - //TODO(tommylikehu): Fix me and enable it. - //if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok { - // msg = msg + validateInfo - //} + if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok { + msg = msg + validateInfo + } if msg != "" { reviewResponse.Allowed = false diff --git a/pkg/admission/mutate_job.go b/pkg/admission/mutate_job.go index 63d0c270a9..f7ed88f2c2 100644 --- a/pkg/admission/mutate_job.go +++ b/pkg/admission/mutate_job.go @@ -19,16 +19,13 @@ package admission import ( "encoding/json" "fmt" - "math/rand" - "strconv" - "time" - "github.com/golang/glog" + "strconv" "k8s.io/api/admission/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) type patchOperation struct { @@ -74,7 +71,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { func createPatch(job v1alpha1.Job) ([]byte, error) { var patch []patchOperation patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...) - patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...) return json.Marshal(patch) } @@ -95,30 +91,3 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat return patch } - -func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) { - if len(metadata.Annotations) == 0 { - metadata.Annotations = make(map[string]string) - } - randomStr := genRandomStr(5) - metadata.Annotations[PVCInputName] = fmt.Sprintf("%s-input-%s", metadata.Name, randomStr) - metadata.Annotations[PVCOutputName] = fmt.Sprintf("%s-output-%s", metadata.Name, randomStr) - patch = append(patch, patchOperation{ - Op: "replace", - Path: basePath, - Value: metadata, - }) - - return patch -} - -func genRandomStr(l int) string { - str := "0123456789abcdefghijklmnopqrstuvwxyz" - bytes := []byte(str) - result := []byte{} - r := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < l; i++ { - result = append(result, bytes[r.Intn(len(bytes))]) - } - return string(result) -} diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index f0d50c6176..84b4f576eb 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -47,32 +47,29 @@ type JobSpec struct { // +optional MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"` - // The volume mount for input of Job - Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"` - - // The volume mount for output of Job - Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"` + // The volumes mount on Job + Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,3,opt,name=volumes"` // Tasks specifies the task specification of Job // +optional - Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"` + Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,4,opt,name=tasks"` // Specifies the default lifecycle of tasks // +optional - Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"` + Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"` // Specifies the plugin of job // Key is plugin name, value is the arguments of the plugin // +optional - Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,7,opt,name=plugins"` + Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,6,opt,name=plugins"` //Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty. - Queue string `json:"queue,omitempty" protobuf:"bytes,8,opt,name=queue"` + Queue string `json:"queue,omitempty" protobuf:"bytes,7,opt,name=queue"` // Specifies the maximum number of retries before marking this Job failed. // Defaults to 3. // +optional - MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,9,opt,name=maxRetry"` + MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"` } // VolumeSpec defines the specification of Volume, e.g. PVC @@ -81,8 +78,11 @@ type VolumeSpec struct { // not contain ':'. MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"` + // defined the PVC name + VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"` + // VolumeClaim defines the PVC used by the VolumeMount. - VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"` + VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,3,opt,name=volumeClaim"` } type JobEvent string diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 6b844540fb..ab368e414e 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -90,15 +90,12 @@ func (in *JobList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobSpec) DeepCopyInto(out *JobSpec) { *out = *in - if in.Input != nil { - in, out := &in.Input, &out.Input - *out = new(VolumeSpec) - (*in).DeepCopyInto(*out) - } - if in.Output != nil { - in, out := &in.Output, &out.Output - *out = new(VolumeSpec) - (*in).DeepCopyInto(*out) + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + *out = make([]VolumeSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.Tasks != nil { in, out := &in.Tasks, &out.Tasks diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index 93a42f5fba..ca44a7cadc 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -18,9 +18,11 @@ package helpers import ( "fmt" + "math/rand" "strings" + "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) const ( @@ -39,3 +41,14 @@ func GetTaskIndex(pod *v1.Pod) string { func MakePodName(jobName string, taskName string, index int) string { return fmt.Sprintf(PodNameFmt, jobName, taskName, index) } + +func GenRandomStr(l int) string { + str := "0123456789abcdefghijklmnopqrstuvwxyz" + bytes := []byte(str) + result := []byte{} + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < l; i++ { + result = append(result, bytes[r.Intn(len(bytes))]) + } + return string(result) +} diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 4181f6e55e..ca9c7e5132 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -29,8 +29,6 @@ import ( kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" kbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" - - admissioncontroller "volcano.sh/volcano/pkg/admission" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/controllers/apis" @@ -143,12 +141,17 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta job := jobInfo.Job glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - if err := cc.createPodGroupIfNotExist(job); err != nil { + newJob, err := cc.needUpdateForVolumeClaim(job) + if err != nil { return err } - - if err := cc.createJobIOIfNotExist(job); err != nil { - return err + if newJob != nil { + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { + glog.Errorf("Failed to update Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } + return nil } if err := cc.pluginOnJobAdd(job); err != nil { @@ -157,6 +160,14 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta return err } + if err := cc.createPodGroupIfNotExist(job); err != nil { + return err + } + + if err := cc.createJobIOIfNotExist(job); err != nil { + return err + } + return nil } @@ -320,68 +331,89 @@ func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 { } func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { - // If input/output PVC does not exist, create them for Job. - inputPVC := job.Annotations[admissioncontroller.PVCInputName] - outputPVC := job.Annotations[admissioncontroller.PVCOutputName] - if job.Spec.Input != nil { - if job.Spec.Input.VolumeClaim != nil { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) + // If PVC does not exist, create them for Job. + volumes := job.Spec.Volumes + for _, volume := range volumes { + vcName := volume.VolumeClaimName + exist, err := cc.checkPVCExist(job, vcName) + if err != nil { + return err + } + if !exist { + if job.Status.ControlledResources == nil { + job.Status.ControlledResources = make(map[string]string) + } + if volume.VolumeClaim != nil { + if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil { return err } + job.Status.ControlledResources["volume-pvc-"+vcName] = vcName + } else { + job.Status.ControlledResources["volume-emptyDir-"+vcName] = vcName + } + } + } + return nil +} - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: inputPVC, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: *job.Spec.Input.VolumeClaim, +func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) { + // If VolumeClaimName does not exist, generate them for Job. + var newJob *vkv1.Job + volumes := job.Spec.Volumes + for index, volume := range volumes { + vcName := volume.VolumeClaimName + if len(vcName) == 0 { + for { + randomStr := vkjobhelpers.GenRandomStr(12) + vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr) + exist, err := cc.checkPVCExist(job, vcName) + if err != nil { + return nil, err } - - glog.V(3).Infof("Try to create input PVC: %v", pvc) - - if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { - glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return e + if exist { + continue + } + if newJob == nil { + newJob = job.DeepCopy() } + newJob.Spec.Volumes[index].VolumeClaimName = vcName + break } } } - if job.Spec.Output != nil { - if job.Spec.Output.VolumeClaim != nil { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - //return err - } + return newJob, nil +} - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: outputPVC, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: *job.Spec.Output.VolumeClaim, - } +func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(vcName); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + glog.V(3).Infof("Failed to get PVC for job <%s/%s>: %v", + job.Namespace, job.Name, err) + return false, err + } + return true, nil +} - glog.V(3).Infof("Try to create output PVC: %v", pvc) +func (cc *Controller) createPVC(job *vkv1.Job, vcName string, volumeClaim *v1.PersistentVolumeClaimSpec) error { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: vcName, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: *volumeClaim, + } - if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { - glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return e - } - } - } + glog.V(3).Infof("Try to create PVC: %v", pvc) + + if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil { + glog.V(3).Infof("Failed to create PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, e) + return e } return nil } diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 085c3a8332..cd86335998 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -21,12 +21,10 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - admissioncontroller "volcano.sh/volcano/pkg/admission" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/controllers/apis" @@ -54,7 +52,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: MakePodName(job.Name, template.Name, ix), + Name: vkjobhelpers.MakePodName(job.Name, template.Name, ix), Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), @@ -70,61 +68,34 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { pod.Spec.SchedulerName = job.Spec.SchedulerName } - inputPVC := job.Annotations[admissioncontroller.PVCInputName] - outputPVC := job.Annotations[admissioncontroller.PVCOutputName] - if job.Spec.Output != nil { - if job.Spec.Output.VolumeClaim == nil { - volume := v1.Volume{ - Name: outputPVC, - } - volume.EmptyDir = &v1.EmptyDirVolumeSource{} - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } else { - volume := v1.Volume{ - Name: outputPVC, - } - volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: outputPVC, - } - - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } - - for i, c := range pod.Spec.Containers { - vm := v1.VolumeMount{ - MountPath: job.Spec.Output.MountPath, - Name: outputPVC, - } - pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) - } - } - - if job.Spec.Input != nil { - if job.Spec.Input.VolumeClaim == nil { - volume := v1.Volume{ - Name: inputPVC, - } - volume.EmptyDir = &v1.EmptyDirVolumeSource{} - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } else { - volume := v1.Volume{ - Name: inputPVC, - } - volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: inputPVC, + volumeMap := make(map[string]bool) + for _, volume := range job.Spec.Volumes { + vcName := volume.VolumeClaimName + if _, ok := volumeMap[vcName]; !ok { + if _, ok := job.Status.ControlledResources["volume-emptyDir-"+vcName]; ok && volume.VolumeClaim == nil { + volume := v1.Volume{ + Name: vcName, + } + volume.EmptyDir = &v1.EmptyDirVolumeSource{} + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + } else { + volume := v1.Volume{ + Name: vcName, + } + volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: vcName, + } + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) } - - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + volumeMap[vcName] = true } for i, c := range pod.Spec.Containers { vm := v1.VolumeMount{ - MountPath: job.Spec.Input.MountPath, - Name: inputPVC, + MountPath: volume.MountPath, + Name: vcName, } - pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) - } }