Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync job works concurrently #190

Merged

Conversation

SrinivasChilveri
Copy link
Contributor

sync jobs concurrently

Fix : #93

@SrinivasChilveri
Copy link
Contributor Author

@hzxuzhonghu , pls do have a look.

go wait.Until(cc.worker, 0, stopCh)

for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we use time.Second here but not 0 like the old logic? ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lminzhw followed k8s controller mgr controllers way.I hope if we have many controllers then its good to have some time instead of zero. may be we can have this parameter also as config parameter. what do you say?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I browse some code of controller in k8s, and realize that most of k8s controller use time.Second as interval and multi-worker to solve the large scale problem.

Seems good :)

@hzxuzhonghu
Copy link
Collaborator

@SrinivasChilveri it is not that simple to concurrently run multi workers, the workers may conflict operating same Job i think. Have to prevent this.

@SrinivasChilveri
Copy link
Contributor Author

@SrinivasChilveri it is not that simple to concurrently run multi workers, the workers may conflict operating same Job i think. Have to prevent this.

yeah, as our controller can have multiple entries related to same job in queue. have pushed the commit. pls do have a look

@SrinivasChilveri
Copy link
Contributor Author

@hzxuzhonghu ,added new commit for your comment. pls do have a look.

cc.jobsLock.Lock()
if cc.jobsMap[key] {
// the job is being processed by some other thread
cc.queue.AddRateLimited(req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if conflict after a few times, it'll take a really time to retry job. It's better to shard the request to different chanel/worker.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this re-enqueue ops can lead to request processed in unexpected orders, though i have no detailed scenario now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@k82cn , @hzxuzhonghu , Based on my understading sharding with single queue will not solve this issue. so i have added queueList with number of workers & did the commit. please do review the same.

// Later we can remove this code if we want
if !cc.belongsToThisRoutine(key, count) {
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
cc.queueList[count].AddRateLimited(req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use AddRateLimited in this case; just hash request by job into different queue, and have several workers to handle them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

Copy link
Collaborator

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some nits

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
func (cc *Controller) processNextReq(count uint32) bool {
obj, shutdown := cc.queueList[count].Get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would like to acquire the queue before this loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

defer cc.queueList[count].Done(req)

key := jobcache.JobKeyByReq(&req)
// Later we can remove this code if we want
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we are sharing the jobs and adding into different queues. In this function just reading from different queues ,so it never happens to hit success this condition because the while placing the job request into different queue based on shard. while reading also it should belong to same shard / queue/worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// prevent multi threads processing the same job simultaneously.
cc.jobsLock.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would not happen if you do sharding in the source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, will remove the code

@@ -241,7 +247,9 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can guarantee worker threads will not process same jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@hzxuzhonghu
Copy link
Collaborator

I'd like to see some tests on concurrent accessing the job cache

@SrinivasChilveri
Copy link
Contributor Author

SrinivasChilveri commented Jun 18, 2019

I'd like to see some tests on concurrent accessing the job cache

better to do in separate PR

@hzxuzhonghu
Copy link
Collaborator

LGTM, leaves to @k82cn for last eye

@SrinivasChilveri
Copy link
Contributor Author

@k82cn @hzxuzhonghu , pls do have a look and let me know, need to update any more points.

@volcano-sh-bot volcano-sh-bot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jun 27, 2019
@volcano-sh-bot volcano-sh-bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Jun 27, 2019
@SrinivasChilveri
Copy link
Contributor Author

@k82cn , the PR rebased.

@volcano-sh-bot volcano-sh-bot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Jun 27, 2019
@SrinivasChilveri
Copy link
Contributor Author

@k82cn do i need to change anything else.

}

// TODO we may need to make this sharding more proper if required
func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't work as its name describes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the name

@TommyLike
Copy link
Contributor

/lgtm

@volcano-sh-bot volcano-sh-bot added the lgtm Indicates that a PR is ready to be merged. label Jul 9, 2019
@TommyLike
Copy link
Contributor

cc @k82cn @hzxuzhonghu

@hzxuzhonghu
Copy link
Collaborator

/lgtm

queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go jobController.Run(opt.WorkerThreads, ctx.Done())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already create jobController with opt.WorkerThreads, why Run still need this parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -21,6 +21,9 @@ import (
"sync"

"github.com/golang/glog"
"hash"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close to other system libs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

}

// NewJobController create new Job Controller
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
workers uint32,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

func (cc *Controller) worker() {
for cc.processNextReq() {
func (cc *Controller) worker(i uint32) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

time.Second,
stopCh)
}(i)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -65,7 +65,9 @@ func (cc *Controller) addJob(obj interface{}) {
glog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduces a helper function for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@volcano-sh-bot volcano-sh-bot removed the lgtm Indicates that a PR is ready to be merged. label Jul 10, 2019
@SrinivasChilveri
Copy link
Contributor Author

@hzxuzhonghu @k82cn @TommyLike , Fixed all the comments

@hzxuzhonghu
Copy link
Collaborator

/lgtm

@volcano-sh-bot volcano-sh-bot added the lgtm Indicates that a PR is ready to be merged. label Jul 10, 2019
@hzxuzhonghu
Copy link
Collaborator

/approve

@SrinivasChilveri
Copy link
Contributor Author

@TommyLike @hex108 any other suggestions.

@hzxuzhonghu
Copy link
Collaborator

ping @k82cn for approval

@TommyLike
Copy link
Contributor

/lgtm

@k82cn
Copy link
Member

k82cn commented Jul 12, 2019

/approve

@volcano-sh-bot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: hzxuzhonghu, k82cn, SrinivasChilveri

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@volcano-sh-bot volcano-sh-bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jul 12, 2019
@volcano-sh-bot volcano-sh-bot merged commit c546f9f into volcano-sh:master Jul 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. lgtm Indicates that a PR is ready to be merged. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow multi sync job works run in parallel
6 participants