Skip to content

Commit

Permalink
Combine input & output volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyLike committed May 5, 2019
1 parent 072b95d commit 71defce
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 168 deletions.
7 changes: 3 additions & 4 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
}
}

//TODO(tommylikehu): Fix me and enable it.
//if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok {
// msg = msg + validateInfo
//}
if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok {
msg = msg + validateInfo
}

if msg != "" {
reviewResponse.Allowed = false
Expand Down
35 changes: 2 additions & 33 deletions pkg/admission/mutate_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ package admission
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"

"github.com/golang/glog"
"strconv"

"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

type patchOperation struct {
Expand Down Expand Up @@ -74,7 +71,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
func createPatch(job v1alpha1.Job) ([]byte, error) {
var patch []patchOperation
patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...)
patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...)

return json.Marshal(patch)
}
Expand All @@ -95,30 +91,3 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat

return patch
}

func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) {
if len(metadata.Annotations) == 0 {
metadata.Annotations = make(map[string]string)
}
randomStr := genRandomStr(5)
metadata.Annotations[PVCInputName] = fmt.Sprintf("%s-input-%s", metadata.Name, randomStr)
metadata.Annotations[PVCOutputName] = fmt.Sprintf("%s-output-%s", metadata.Name, randomStr)
patch = append(patch, patchOperation{
Op: "replace",
Path: basePath,
Value: metadata,
})

return patch
}

func genRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
22 changes: 11 additions & 11 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,29 @@ type JobSpec struct {
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`

// The volume mount for input of Job
Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"`

// The volume mount for output of Job
Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"`
// The volumes mount on Job
Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,3,opt,name=volumes"`

// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,4,opt,name=tasks"`

// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"`
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`

// Specifies the plugin of job
// Key is plugin name, value is the arguments of the plugin
// +optional
Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,7,opt,name=plugins"`
Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,6,opt,name=plugins"`

//Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty.
Queue string `json:"queue,omitempty" protobuf:"bytes,8,opt,name=queue"`
Queue string `json:"queue,omitempty" protobuf:"bytes,7,opt,name=queue"`

// Specifies the maximum number of retries before marking this Job failed.
// Defaults to 3.
// +optional
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,9,opt,name=maxRetry"`
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"`
}

// VolumeSpec defines the specification of Volume, e.g. PVC
Expand All @@ -81,8 +78,11 @@ type VolumeSpec struct {
// not contain ':'.
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`

// defined the PVC name
VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`

// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,3,opt,name=volumeClaim"`
}

type JobEvent string
Expand Down
15 changes: 6 additions & 9 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package helpers

import (
"fmt"
"math/rand"
"strings"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

const (
Expand All @@ -39,3 +41,14 @@ func GetTaskIndex(pod *v1.Pod) string {
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(PodNameFmt, jobName, taskName, index)
}

func GenRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
146 changes: 89 additions & 57 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (

kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"

admissioncontroller "volcano.sh/volcano/pkg/admission"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/controllers/apis"
Expand Down Expand Up @@ -143,12 +141,17 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
job := jobInfo.Job
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if err := cc.createPodGroupIfNotExist(job); err != nil {
newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
return err
}

if err := cc.createJobIOIfNotExist(job); err != nil {
return err
if newJob != nil {
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
Expand All @@ -157,6 +160,14 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
return err
}

if err := cc.createPodGroupIfNotExist(job); err != nil {
return err
}

if err := cc.createJobIOIfNotExist(job); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -320,68 +331,89 @@ func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 {
}

func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
// If input/output PVC does not exist, create them for Job.
inputPVC := job.Annotations[admissioncontroller.PVCInputName]
outputPVC := job.Annotations[admissioncontroller.PVCOutputName]
if job.Spec.Input != nil {
if job.Spec.Input.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
// If PVC does not exist, create them for Job.
volumes := job.Spec.Volumes
for _, volume := range volumes {
vcName := volume.VolumeClaimName
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err
}
if !exist {
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return err
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
} else {
job.Status.ControlledResources["volume-emptyDir-"+vcName] = vcName
}
}
}
return nil
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: inputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *job.Spec.Input.VolumeClaim,
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
// If VolumeClaimName does not exist, generate them for Job.
var newJob *vkv1.Job
volumes := job.Spec.Volumes
for index, volume := range volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
for {
randomStr := vkjobhelpers.GenRandomStr(12)
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return nil, err
}

glog.V(3).Infof("Try to create input PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
if exist {
continue
}
if newJob == nil {
newJob = job.DeepCopy()
}
newJob.Spec.Volumes[index].VolumeClaimName = vcName
break
}
}
}
if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
//return err
}
return newJob, nil
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: outputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *job.Spec.Output.VolumeClaim,
}
func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(vcName); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
glog.V(3).Infof("Failed to get PVC for job <%s/%s>: %v",
job.Namespace, job.Name, err)
return false, err
}
return true, nil
}

glog.V(3).Infof("Try to create output PVC: %v", pvc)
func (cc *Controller) createPVC(job *vkv1.Job, vcName string, volumeClaim *v1.PersistentVolumeClaimSpec) error {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: vcName,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *volumeClaim,
}

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
}
glog.V(3).Infof("Try to create PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, e)
return e
}
return nil
}
Expand Down
Loading

0 comments on commit 71defce

Please sign in to comment.