diff --git a/pkg/nfd-master/updater-pool.go b/pkg/nfd-master/updater-pool.go index fec6027137..cf4419d35b 100644 --- a/pkg/nfd-master/updater-pool.go +++ b/pkg/nfd-master/updater-pool.go @@ -31,6 +31,7 @@ import ( ) type updaterPool struct { + started bool queue workqueue.RateLimitingInterface nfgQueue workqueue.RateLimitingInterface sync.RWMutex @@ -129,16 +130,11 @@ func (u *updaterPool) start(parallelism int) { u.Lock() defer u.Unlock() - if u.queue != nil && !u.queue.ShuttingDown() { + if u.started { klog.InfoS("the NFD master updater pool is already running.") return } - if u.nfgQueue != nil && !u.nfgQueue.ShuttingDown() { - klog.InfoS("the NFD master node feature group updater pool is already running.") - return - } - klog.InfoS("starting the NFD master updater pool", "parallelism", parallelism) // Create ratelimiter. Mimic workqueue.DefaultControllerRateLimiter() but @@ -158,18 +154,14 @@ func (u *updaterPool) start(parallelism int) { go u.runNodeFeatureGroupUpdater(u.nfgQueue) } } + u.started = true } func (u *updaterPool) stop() { u.Lock() defer u.Unlock() - if u.queue == nil || u.queue.ShuttingDown() { - klog.InfoS("the NFD master updater pool is not running.") - return - } - - if u.nfgQueue == nil || u.nfgQueue.ShuttingDown() { + if !u.started { klog.InfoS("the NFD master updater pool is not running.") return } @@ -179,6 +171,13 @@ func (u *updaterPool) stop() { u.wg.Wait() u.nfgQueue.ShutDown() u.nfgWg.Wait() + u.started = false +} + +func (u *updaterPool) running() bool { + u.RLock() + defer u.RUnlock() + return u.started } func (u *updaterPool) addNode(nodeName string) { diff --git a/pkg/nfd-master/updater-pool_test.go b/pkg/nfd-master/updater-pool_test.go index efeab40434..7430d5a1df 100644 --- a/pkg/nfd-master/updater-pool_test.go +++ b/pkg/nfd-master/updater-pool_test.go @@ -37,8 +37,15 @@ func TestUpdaterStart(t *testing.T) { fakeMaster := newFakeMaster() updaterPool := newFakeupdaterPool(fakeMaster) + Convey("New node updater pool should report running=false", t, func() { + So(updaterPool.running(), ShouldBeFalse) + }) + Convey("When starting the node updater pool", t, func() { updaterPool.start(10) + Convey("Running node updater pool should report running=true", func() { + So(updaterPool.running(), ShouldBeTrue) + }) q := updaterPool.queue Convey("Node updater pool queue properties should change", func() { So(q, ShouldNotBeNil) @@ -57,9 +64,15 @@ func TestNodeUpdaterStop(t *testing.T) { updaterPool := newFakeupdaterPool(fakeMaster) updaterPool.start(10) + Convey("Running node updater pool should report running=true", t, func() { + So(updaterPool.running(), ShouldBeTrue) + }) Convey("When stoping the node updater pool", t, func() { updaterPool.stop() + Convey("Stopped node updater pool should report running=false", func() { + So(updaterPool.running(), ShouldBeFalse) + }) Convey("Node updater pool queue should be removed", func() { // Wait for the wg.Done() So(func() interface{} {