Skip to content

Commit

Permalink
Merge pull request #190 from SrinivasChilveri/SynchJobConcurrently
Browse files Browse the repository at this point in the history
sync job works concurrently
  • Loading branch information
volcano-sh-bot committed Jul 12, 2019
2 parents d11e920 + eee74aa commit c546f9f
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 28 deletions.
10 changes: 8 additions & 2 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -36,6 +37,9 @@ type ServerOption struct {
KubeAPIBurst int
KubeAPIQPS float32
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
WorkerThreads uint32
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -54,6 +58,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
9 changes: 5 additions & 4 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Run(opt *options.ServerOption) error {
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient)
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)

Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"strings"
"time"
"volcano.sh/volcano/pkg/controllers/apis"
)

const (
Expand Down Expand Up @@ -61,3 +62,8 @@ func genRandomStr(l int) string {
func MakeVolumeClaimName(jobName string) string {
return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12))
}

// GetJobKeyByReq gets the key for the job request
func GetJobKeyByReq(req *apis.Request) string {
return fmt.Sprintf("%s/%s", req.Namespace, req.JobName)
}
82 changes: 72 additions & 10 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package job

import (
"fmt"
"hash"
"hash/fnv"
"sync"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -95,7 +98,7 @@ type Controller struct {
pcSynced func() bool

// queue that need to sync up
queue workqueue.RateLimitingInterface
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
cache jobcache.Cache
//Job Event recorder
Expand All @@ -104,13 +107,15 @@ type Controller struct {

sync.Mutex
errTasks workqueue.RateLimitingInterface
workers uint32
}

// NewJobController create new Job Controller
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
workers uint32,
) *Controller {

//Initialize event client
Expand All @@ -123,12 +128,17 @@ func NewJobController(
kubeClients: kubeClient,
vkClients: vkClient,
kbClients: kbClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queueList: make([]workqueue.RateLimitingInterface, workers, workers),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
errTasks: newRateLimitingQueue(),
recorder: recorder,
priorityClasses: make(map[string]*v1beta1.PriorityClass),
workers: workers,
}
var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}

cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs()
Expand Down Expand Up @@ -192,6 +202,7 @@ func NewJobController(

// Run start JobController
func (cc *Controller) Run(stopCh <-chan struct{}) {

go cc.sharedInformers.Start(stopCh)
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
Expand All @@ -205,7 +216,17 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced)

go wait.Until(cc.handleCommands, 0, stopCh)
go wait.Until(cc.worker, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
}

go cc.cache.Run(stopCh)

Expand All @@ -215,20 +236,61 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
glog.Infof("JobController is running ...... ")
}

func (cc *Controller) worker() {
for cc.processNextReq() {
func (cc *Controller) worker(i uint32) {
glog.Infof("worker %d start ...... ", i)

for cc.processNextReq(i) {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool {
var hashVal hash.Hash32
var val uint32

hashVal = fnv.New32()
hashVal.Write([]byte(key))

val = hashVal.Sum32()

if val%cc.workers == count {
return true
}

return false
}

func (cc *Controller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
var hashVal hash.Hash32
var val uint32

hashVal = fnv.New32()
hashVal.Write([]byte(key))

val = hashVal.Sum32()

queue := cc.queueList[val%cc.workers]

return queue
}

func (cc *Controller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(apis.Request)
defer cc.queue.Done(req)
defer queue.Done(req)

key := jobcache.JobKeyByReq(&req)
if !cc.belongsToThisRoutine(key, count) {
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}

glog.V(3).Infof("Try to handle request <%v>", req)

Expand Down Expand Up @@ -259,12 +321,12 @@ func (cc *Controller) processNextReq() bool {
glog.Errorf("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
cc.queue.AddRateLimited(req)
queue.AddRateLimited(req)
return true
}

// If no error, forget it.
cc.queue.Forget(req)
queue.Forget(req)

return true
}
29 changes: 22 additions & 7 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"volcano.sh/volcano/pkg/controllers/apis"
vkcache "volcano.sh/volcano/pkg/controllers/cache"
vkjobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
)

func (cc *Controller) addCommand(obj interface{}) {
Expand Down Expand Up @@ -65,7 +66,9 @@ func (cc *Controller) addJob(obj interface{}) {
glog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -100,7 +103,9 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Event: vkbatchv1.OutOfSyncEvent,
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) deleteJob(obj interface{}) {
Expand Down Expand Up @@ -166,7 +171,9 @@ func (cc *Controller) addPod(obj interface{}) {
glog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
pod.Namespace, pod.Name, err)
}
cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) updatePod(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -247,7 +254,9 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) deletePod(obj interface{}) {
Expand Down Expand Up @@ -304,7 +313,9 @@ func (cc *Controller) deletePod(obj interface{}) {
pod.Namespace, pod.Name, err)
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) {
Expand Down Expand Up @@ -349,7 +360,9 @@ func (cc *Controller) processNextCommand() bool {
Action: vkbatchv1.Action(cmd.Action),
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)

return true
}
Expand Down Expand Up @@ -384,7 +397,9 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
case kbtype.PodGroupInqueue:
req.Action = vkbatchv1.EnqueueAction
}
cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newController() *Controller {
}

vkclient := vkclientset.NewForConfigOrDie(config)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3)

return controller
}
Expand Down Expand Up @@ -162,7 +162,8 @@ func TestJobAddFunc(t *testing.T) {
if job == nil || err != nil {
t.Errorf("Error while Adding Job in case %d with error %s", i, err)
}
len := controller.queue.Len()
queue := controller.getWorkerQueue(key)
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down Expand Up @@ -513,7 +514,9 @@ func TestUpdatePodGroupFunc(t *testing.T) {
for i, testcase := range testCases {
controller := newController()
controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup)
len := controller.queue.Len()
key := fmt.Sprintf("%s/%s", testcase.oldPodGroup.Namespace, testcase.oldPodGroup.Name)
queue := controller.getWorkerQueue(key)
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newFakeController() *Controller {
VolcanoClientSet := volcanoclient.NewSimpleClientset()
KubeClientSet := kubeclient.NewSimpleClientset()

controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet)
controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3)
return controller
}

Expand Down

0 comments on commit c546f9f

Please sign in to comment.