1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

nfd-master: cleanup updater-pool method args

We store the work queues in the updater pool struct so we don't need to
pass those as function arguments.
This commit is contained in:
Markus Lehtonen 2024-09-16 14:50:08 +03:00
parent a8fd5051bc
commit 9fad67ee39

View file

@ -48,14 +48,14 @@ func newUpdaterPool(nfdMaster *nfdMaster) *updaterPool {
}
}
func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
n, quit := queue.Get()
func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface) bool {
n, quit := u.queue.Get()
if quit {
return false
}
nodeName := n.(string)
defer queue.Done(nodeName)
defer u.queue.Done(nodeName)
nodeUpdateRequests.Inc()
@ -63,21 +63,21 @@ func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue wo
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
klog.InfoS("node not found, skip update", "nodeName", nodeName)
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
if n := queue.NumRequeues(nodeName); n < 15 {
if n := u.queue.NumRequeues(nodeName); n < 15 {
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
} else {
klog.ErrorS(err, "node update failed, queuing for retry ", "nodeName", nodeName, "numRetries", n)
// Count only long-failing attempts
nodeUpdateFailures.Inc()
}
queue.AddRateLimited(nodeName)
u.queue.AddRateLimited(nodeName)
return true
}
queue.Forget(nodeName)
u.queue.Forget(nodeName)
return true
}
func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
func (u *updaterPool) runNodeUpdater() {
var cli k8sclient.Interface
if u.nfdMaster.kubeconfig != nil {
// For normal execution, initialize a separate api client for each updater
@ -86,17 +86,17 @@ func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
// For tests, re-use the api client from nfd-master
cli = u.nfdMaster.k8sClient
}
for u.processNodeUpdateRequest(cli, queue) {
for u.processNodeUpdateRequest(cli) {
}
u.wg.Done()
}
func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Interface, ngfQueue workqueue.RateLimitingInterface) bool {
nfgName, quit := ngfQueue.Get()
func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Interface) bool {
nfgName, quit := u.nfgQueue.Get()
if quit {
return false
}
defer ngfQueue.Done(nfgName)
defer u.nfgQueue.Done(nfgName)
nodeFeatureGroupUpdateRequests.Inc()
@ -106,22 +106,22 @@ func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Inte
if nfg, err = getNodeFeatureGroup(cli, u.nfdMaster.namespace, nfgName.(string)); apierrors.IsNotFound(err) {
klog.InfoS("NodeFeatureGroup not found, skip update", "NodeFeatureGroupName", nfgName)
} else if err := u.nfdMaster.nfdAPIUpdateNodeFeatureGroup(u.nfdMaster.nfdClient, nfg); err != nil {
if n := ngfQueue.NumRequeues(nfgName); n < 15 {
if n := u.nfgQueue.NumRequeues(nfgName); n < 15 {
klog.InfoS("retrying NodeFeatureGroup update", "nodeFeatureGroup", klog.KObj(nfg), "lastError", err)
} else {
klog.ErrorS(err, "failed to update NodeFeatureGroup, queueing for retry", "nodeFeatureGroup", klog.KObj(nfg), "lastError", err, "numRetries", n)
}
ngfQueue.AddRateLimited(nfgName)
u.nfgQueue.AddRateLimited(nfgName)
return true
}
ngfQueue.Forget(nfgName)
u.nfgQueue.Forget(nfgName)
return true
}
func (u *updaterPool) runNodeFeatureGroupUpdater(ngfQueue workqueue.RateLimitingInterface) {
func (u *updaterPool) runNodeFeatureGroupUpdater() {
cli := nfdclientset.NewForConfigOrDie(u.nfdMaster.kubeconfig)
for u.processNodeFeatureGroupUpdateRequest(cli, ngfQueue) {
for u.processNodeFeatureGroupUpdateRequest(cli) {
}
u.nfgWg.Done()
}
@ -148,10 +148,10 @@ func (u *updaterPool) start(parallelism int) {
for i := 0; i < parallelism; i++ {
u.wg.Add(1)
go u.runNodeUpdater(u.queue)
go u.runNodeUpdater()
if features.NFDFeatureGate.Enabled(features.NodeFeatureGroupAPI) {
u.nfgWg.Add(1)
go u.runNodeFeatureGroupUpdater(u.nfgQueue)
go u.runNodeFeatureGroupUpdater()
}
}
u.started = true