From e318bd076e993a4a2cb0ab662c07f3d85b6d1fb9 Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Thu, 30 May 2019 11:05:59 +0800 Subject: [PATCH 1/3] fix vk-controller cache --- pkg/controllers/job/job_controller.go | 8 ++ pkg/controllers/job/job_controller_actions.go | 7 ++ pkg/controllers/job/job_controller_resync.go | 84 +++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 pkg/controllers/job/job_controller_resync.go diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index c82c875355..66e600a0db 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "sync" "github.com/golang/glog" @@ -100,6 +101,9 @@ type Controller struct { //Job Event recorder recorder record.EventRecorder priorityClasses map[string]*v1beta1.PriorityClass + + sync.Mutex + errTasks workqueue.RateLimitingInterface } // NewJobController create new Job Controller @@ -122,6 +126,7 @@ func NewJobController( queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), cache: jobcache.New(), + errTasks: newRateLimitingQueue(), recorder: recorder, priorityClasses: make(map[string]*v1beta1.PriorityClass), } @@ -204,6 +209,9 @@ func (cc *Controller) Run(stopCh <-chan struct{}) { go cc.cache.Run(stopCh) + // Re-sync error tasks. + go wait.Until(cc.processResyncTask, 0, stopCh) + glog.Infof("JobController is running ...... ") } diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 9aa1fbac6b..994b4e337c 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -72,6 +72,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM } // record the err, and then collect the pod info like retained pod errs = append(errs, err) + cc.resyncTask(pod) } switch pod.Status.Phase { @@ -271,6 +272,11 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt pod.Name, job.Name, err) creationErrs = append(creationErrs, err) } else { + if err != nil && apierrors.IsAlreadyExists(err) { + cc.resyncTask(pod) + } + + // TODO: maybe not pending status, maybe unknown. pending++ glog.V(3).Infof("Created Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) @@ -298,6 +304,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt glog.Errorf("Failed to delete pod %s for Job %s, err %#v", pod.Name, job.Name, err) deletionErrs = append(deletionErrs, err) + cc.resyncTask(pod) } else { glog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) diff --git a/pkg/controllers/job/job_controller_resync.go b/pkg/controllers/job/job_controller_resync.go new file mode 100644 index 0000000000..ed124849bf --- /dev/null +++ b/pkg/controllers/job/job_controller_resync.go @@ -0,0 +1,84 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "time" + + "golang.org/x/time/rate" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" +) + +func newRateLimitingQueue() workqueue.RateLimitingInterface { + return workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 180*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + )) +} + +func (cc *Controller) processResyncTask() { + obj, shutdown := cc.errTasks.Get() + if shutdown { + return + } + + defer cc.errTasks.Done(obj) + + task, ok := obj.(*v1.Pod) + if !ok { + glog.Errorf("failed to convert %v to *v1.Pod", obj) + return + } + + if err := cc.syncTask(task); err != nil { + glog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name) + cc.resyncTask(task) + } +} + +func (cc *Controller) syncTask(oldTask *v1.Pod) error { + cc.Mutex.Lock() + defer cc.Mutex.Unlock() + + newPod, err := cc.kubeClients.CoreV1().Pods(oldTask.Namespace).Get(oldTask.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + if err := cc.cache.DeletePod(oldTask); err != nil { + glog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err) + return err + } + glog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name) + + return nil + } + return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err) + } + + return cc.cache.UpdatePod(newPod) +} + +func (cc *Controller) resyncTask(task *v1.Pod) { + cc.errTasks.AddRateLimited(task) +} From 56fa5f266259f9c6725f695cd068dfeb97ee146b Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Mon, 3 Jun 2019 19:16:42 +0800 Subject: [PATCH 2/3] fix pc panic --- pkg/controllers/job/job_controller_actions.go | 3 +++ pkg/controllers/job/job_controller_handler.go | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 994b4e337c..773456eb0d 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -491,6 +491,9 @@ func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error { } func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { + cc.Mutex.Lock() + defer cc.Mutex.Unlock() + // sort task by priorityClasses var tasksPriority TasksPriority for index := range job.Spec.Tasks { diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 8df166507a..0f521ee9aa 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -394,6 +394,9 @@ func (cc *Controller) addPriorityClass(obj interface{}) { return } + cc.Mutex.Lock() + defer cc.Mutex.Unlock() + cc.priorityClasses[pc.Name] = pc return } @@ -404,6 +407,9 @@ func (cc *Controller) deletePriorityClass(obj interface{}) { return } + cc.Mutex.Lock() + defer cc.Mutex.Unlock() + delete(cc.priorityClasses, pc.Name) return } From 78b194c30a389518351d811e8f888fc232e79c12 Mon Sep 17 00:00:00 2001 From: wangyuqing4 Date: Wed, 12 Jun 2019 16:43:47 +0800 Subject: [PATCH 3/3] optimize resyncTask --- pkg/controllers/job/job_controller_resync.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/job/job_controller_resync.go b/pkg/controllers/job/job_controller_resync.go index ed124849bf..80fb8f4546 100644 --- a/pkg/controllers/job/job_controller_resync.go +++ b/pkg/controllers/job/job_controller_resync.go @@ -44,6 +44,12 @@ func (cc *Controller) processResyncTask() { return } + // one task only resync 10 times + if cc.errTasks.NumRequeues(obj) > 10 { + cc.errTasks.Forget(obj) + return + } + defer cc.errTasks.Done(obj) task, ok := obj.(*v1.Pod) @@ -53,7 +59,7 @@ func (cc *Controller) processResyncTask() { } if err := cc.syncTask(task); err != nil { - glog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name) + glog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err) cc.resyncTask(task) } }