Skip to content

Commit

Permalink
nfd-master: cleanup updater-pool method args
Browse files Browse the repository at this point in the history
We store the work queues in the updater pool struct so we don't need to
pass those as function arguments.
  • Loading branch information
marquiz committed Sep 16, 2024
1 parent a8fd505 commit 9fad67e
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions pkg/nfd-master/updater-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,36 @@ func newUpdaterPool(nfdMaster *nfdMaster) *updaterPool {
}
}

func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
n, quit := queue.Get()
func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface) bool {
n, quit := u.queue.Get()
if quit {
return false
}
nodeName := n.(string)

defer queue.Done(nodeName)
defer u.queue.Done(nodeName)

nodeUpdateRequests.Inc()

// Check if node exists
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
klog.InfoS("node not found, skip update", "nodeName", nodeName)
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
if n := queue.NumRequeues(nodeName); n < 15 {
if n := u.queue.NumRequeues(nodeName); n < 15 {
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
} else {
klog.ErrorS(err, "node update failed, queuing for retry ", "nodeName", nodeName, "numRetries", n)
// Count only long-failing attempts
nodeUpdateFailures.Inc()
}
queue.AddRateLimited(nodeName)
u.queue.AddRateLimited(nodeName)
return true
}
queue.Forget(nodeName)
u.queue.Forget(nodeName)
return true
}

func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
func (u *updaterPool) runNodeUpdater() {
var cli k8sclient.Interface
if u.nfdMaster.kubeconfig != nil {
// For normal execution, initialize a separate api client for each updater
Expand All @@ -86,17 +86,17 @@ func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
// For tests, re-use the api client from nfd-master
cli = u.nfdMaster.k8sClient
}
for u.processNodeUpdateRequest(cli, queue) {
for u.processNodeUpdateRequest(cli) {
}
u.wg.Done()
}

func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Interface, ngfQueue workqueue.RateLimitingInterface) bool {
nfgName, quit := ngfQueue.Get()
func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Interface) bool {
nfgName, quit := u.nfgQueue.Get()
if quit {
return false
}
defer ngfQueue.Done(nfgName)
defer u.nfgQueue.Done(nfgName)

nodeFeatureGroupUpdateRequests.Inc()

Expand All @@ -106,22 +106,22 @@ func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Inte
if nfg, err = getNodeFeatureGroup(cli, u.nfdMaster.namespace, nfgName.(string)); apierrors.IsNotFound(err) {
klog.InfoS("NodeFeatureGroup not found, skip update", "NodeFeatureGroupName", nfgName)
} else if err := u.nfdMaster.nfdAPIUpdateNodeFeatureGroup(u.nfdMaster.nfdClient, nfg); err != nil {
if n := ngfQueue.NumRequeues(nfgName); n < 15 {
if n := u.nfgQueue.NumRequeues(nfgName); n < 15 {
klog.InfoS("retrying NodeFeatureGroup update", "nodeFeatureGroup", klog.KObj(nfg), "lastError", err)
} else {
klog.ErrorS(err, "failed to update NodeFeatureGroup, queueing for retry", "nodeFeatureGroup", klog.KObj(nfg), "lastError", err, "numRetries", n)
}
ngfQueue.AddRateLimited(nfgName)
u.nfgQueue.AddRateLimited(nfgName)
return true
}

ngfQueue.Forget(nfgName)
u.nfgQueue.Forget(nfgName)
return true
}

func (u *updaterPool) runNodeFeatureGroupUpdater(ngfQueue workqueue.RateLimitingInterface) {
func (u *updaterPool) runNodeFeatureGroupUpdater() {
cli := nfdclientset.NewForConfigOrDie(u.nfdMaster.kubeconfig)
for u.processNodeFeatureGroupUpdateRequest(cli, ngfQueue) {
for u.processNodeFeatureGroupUpdateRequest(cli) {
}
u.nfgWg.Done()
}
Expand All @@ -148,10 +148,10 @@ func (u *updaterPool) start(parallelism int) {

for i := 0; i < parallelism; i++ {
u.wg.Add(1)
go u.runNodeUpdater(u.queue)
go u.runNodeUpdater()
if features.NFDFeatureGate.Enabled(features.NodeFeatureGroupAPI) {
u.nfgWg.Add(1)
go u.runNodeFeatureGroupUpdater(u.nfgQueue)
go u.runNodeFeatureGroupUpdater()
}
}
u.started = true
Expand Down

0 comments on commit 9fad67e

Please sign in to comment.