diff --git a/pkg/nfd-master/updater-pool.go b/pkg/nfd-master/updater-pool.go index cf4419d35b..b65b1942fc 100644 --- a/pkg/nfd-master/updater-pool.go +++ b/pkg/nfd-master/updater-pool.go @@ -48,14 +48,14 @@ func newUpdaterPool(nfdMaster *nfdMaster) *updaterPool { } } -func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool { - n, quit := queue.Get() +func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface) bool { + n, quit := u.queue.Get() if quit { return false } nodeName := n.(string) - defer queue.Done(nodeName) + defer u.queue.Done(nodeName) nodeUpdateRequests.Inc() @@ -63,21 +63,21 @@ func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue wo if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) { klog.InfoS("node not found, skip update", "nodeName", nodeName) } else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil { - if n := queue.NumRequeues(nodeName); n < 15 { + if n := u.queue.NumRequeues(nodeName); n < 15 { klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n) } else { klog.ErrorS(err, "node update failed, queuing for retry ", "nodeName", nodeName, "numRetries", n) // Count only long-failing attempts nodeUpdateFailures.Inc() } - queue.AddRateLimited(nodeName) + u.queue.AddRateLimited(nodeName) return true } - queue.Forget(nodeName) + u.queue.Forget(nodeName) return true } -func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) { +func (u *updaterPool) runNodeUpdater() { var cli k8sclient.Interface if u.nfdMaster.kubeconfig != nil { // For normal execution, initialize a separate api client for each updater @@ -86,17 +86,17 @@ func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) { // For tests, re-use the api client from nfd-master cli = u.nfdMaster.k8sClient } - for u.processNodeUpdateRequest(cli, queue) { + for u.processNodeUpdateRequest(cli) { } u.wg.Done() } -func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Interface, ngfQueue workqueue.RateLimitingInterface) bool { - nfgName, quit := ngfQueue.Get() +func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Interface) bool { + nfgName, quit := u.nfgQueue.Get() if quit { return false } - defer ngfQueue.Done(nfgName) + defer u.nfgQueue.Done(nfgName) nodeFeatureGroupUpdateRequests.Inc() @@ -106,22 +106,22 @@ func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Inte if nfg, err = getNodeFeatureGroup(cli, u.nfdMaster.namespace, nfgName.(string)); apierrors.IsNotFound(err) { klog.InfoS("NodeFeatureGroup not found, skip update", "NodeFeatureGroupName", nfgName) } else if err := u.nfdMaster.nfdAPIUpdateNodeFeatureGroup(u.nfdMaster.nfdClient, nfg); err != nil { - if n := ngfQueue.NumRequeues(nfgName); n < 15 { + if n := u.nfgQueue.NumRequeues(nfgName); n < 15 { klog.InfoS("retrying NodeFeatureGroup update", "nodeFeatureGroup", klog.KObj(nfg), "lastError", err) } else { klog.ErrorS(err, "failed to update NodeFeatureGroup, queueing for retry", "nodeFeatureGroup", klog.KObj(nfg), "lastError", err, "numRetries", n) } - ngfQueue.AddRateLimited(nfgName) + u.nfgQueue.AddRateLimited(nfgName) return true } - ngfQueue.Forget(nfgName) + u.nfgQueue.Forget(nfgName) return true } -func (u *updaterPool) runNodeFeatureGroupUpdater(ngfQueue workqueue.RateLimitingInterface) { +func (u *updaterPool) runNodeFeatureGroupUpdater() { cli := nfdclientset.NewForConfigOrDie(u.nfdMaster.kubeconfig) - for u.processNodeFeatureGroupUpdateRequest(cli, ngfQueue) { + for u.processNodeFeatureGroupUpdateRequest(cli) { } u.nfgWg.Done() } @@ -148,10 +148,10 @@ func (u *updaterPool) start(parallelism int) { for i := 0; i < parallelism; i++ { u.wg.Add(1) - go u.runNodeUpdater(u.queue) + go u.runNodeUpdater() if features.NFDFeatureGate.Enabled(features.NodeFeatureGroupAPI) { u.nfgWg.Add(1) - go u.runNodeFeatureGroupUpdater(u.nfgQueue) + go u.runNodeFeatureGroupUpdater() } } u.started = true