1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-17 05:48:21 +00:00

Merge pull request #1844 from marquiz/devel/updater-pool-started

nfd-master: explicit state variable for the node updater pool
This commit is contained in:
Kubernetes Prow Robot 2024-08-19 06:36:20 -07:00 committed by GitHub
commit df7f65c9b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 24 additions and 12 deletions

View file

@ -31,6 +31,7 @@ import (
) )
type updaterPool struct { type updaterPool struct {
started bool
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
nfgQueue workqueue.RateLimitingInterface nfgQueue workqueue.RateLimitingInterface
sync.RWMutex sync.RWMutex
@ -129,16 +130,11 @@ func (u *updaterPool) start(parallelism int) {
u.Lock() u.Lock()
defer u.Unlock() defer u.Unlock()
if u.queue != nil && !u.queue.ShuttingDown() { if u.started {
klog.InfoS("the NFD master updater pool is already running.") klog.InfoS("the NFD master updater pool is already running.")
return return
} }
if u.nfgQueue != nil && !u.nfgQueue.ShuttingDown() {
klog.InfoS("the NFD master node feature group updater pool is already running.")
return
}
klog.InfoS("starting the NFD master updater pool", "parallelism", parallelism) klog.InfoS("starting the NFD master updater pool", "parallelism", parallelism)
// Create ratelimiter. Mimic workqueue.DefaultControllerRateLimiter() but // Create ratelimiter. Mimic workqueue.DefaultControllerRateLimiter() but
@ -158,18 +154,14 @@ func (u *updaterPool) start(parallelism int) {
go u.runNodeFeatureGroupUpdater(u.nfgQueue) go u.runNodeFeatureGroupUpdater(u.nfgQueue)
} }
} }
u.started = true
} }
func (u *updaterPool) stop() { func (u *updaterPool) stop() {
u.Lock() u.Lock()
defer u.Unlock() defer u.Unlock()
if u.queue == nil || u.queue.ShuttingDown() { if !u.started {
klog.InfoS("the NFD master updater pool is not running.")
return
}
if u.nfgQueue == nil || u.nfgQueue.ShuttingDown() {
klog.InfoS("the NFD master updater pool is not running.") klog.InfoS("the NFD master updater pool is not running.")
return return
} }
@ -179,6 +171,13 @@ func (u *updaterPool) stop() {
u.wg.Wait() u.wg.Wait()
u.nfgQueue.ShutDown() u.nfgQueue.ShutDown()
u.nfgWg.Wait() u.nfgWg.Wait()
u.started = false
}
func (u *updaterPool) running() bool {
u.RLock()
defer u.RUnlock()
return u.started
} }
func (u *updaterPool) addNode(nodeName string) { func (u *updaterPool) addNode(nodeName string) {

View file

@ -37,8 +37,15 @@ func TestUpdaterStart(t *testing.T) {
fakeMaster := newFakeMaster() fakeMaster := newFakeMaster()
updaterPool := newFakeupdaterPool(fakeMaster) updaterPool := newFakeupdaterPool(fakeMaster)
Convey("New node updater pool should report running=false", t, func() {
So(updaterPool.running(), ShouldBeFalse)
})
Convey("When starting the node updater pool", t, func() { Convey("When starting the node updater pool", t, func() {
updaterPool.start(10) updaterPool.start(10)
Convey("Running node updater pool should report running=true", func() {
So(updaterPool.running(), ShouldBeTrue)
})
q := updaterPool.queue q := updaterPool.queue
Convey("Node updater pool queue properties should change", func() { Convey("Node updater pool queue properties should change", func() {
So(q, ShouldNotBeNil) So(q, ShouldNotBeNil)
@ -57,9 +64,15 @@ func TestNodeUpdaterStop(t *testing.T) {
updaterPool := newFakeupdaterPool(fakeMaster) updaterPool := newFakeupdaterPool(fakeMaster)
updaterPool.start(10) updaterPool.start(10)
Convey("Running node updater pool should report running=true", t, func() {
So(updaterPool.running(), ShouldBeTrue)
})
Convey("When stoping the node updater pool", t, func() { Convey("When stoping the node updater pool", t, func() {
updaterPool.stop() updaterPool.stop()
Convey("Stopped node updater pool should report running=false", func() {
So(updaterPool.running(), ShouldBeFalse)
})
Convey("Node updater pool queue should be removed", func() { Convey("Node updater pool queue should be removed", func() {
// Wait for the wg.Done() // Wait for the wg.Done()
So(func() interface{} { So(func() interface{} {