mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2025-03-17 13:58:21 +00:00
Merge pull request #1427 from k8s-infra-cherrypick-robot/cherry-pick-1425-to-release-0.14
[release-0.14] nfd-master: fix retry of node updates
This commit is contained in:
commit
5c298b9273
3 changed files with 13 additions and 3 deletions
|
@ -18,7 +18,9 @@ package nfdmaster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
@ -48,7 +50,7 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
|
||||||
|
|
||||||
nodeUpdateRequests.Inc()
|
nodeUpdateRequests.Inc()
|
||||||
if err := u.nfdMaster.nfdAPIUpdateOneNode(nodeName.(string)); err != nil {
|
if err := u.nfdMaster.nfdAPIUpdateOneNode(nodeName.(string)); err != nil {
|
||||||
if queue.NumRequeues(nodeName) < 5 {
|
if queue.NumRequeues(nodeName) < 15 {
|
||||||
klog.InfoS("retrying node update", "nodeName", nodeName)
|
klog.InfoS("retrying node update", "nodeName", nodeName)
|
||||||
queue.AddRateLimited(nodeName)
|
queue.AddRateLimited(nodeName)
|
||||||
return true
|
return true
|
||||||
|
@ -77,7 +79,14 @@ func (u *nodeUpdaterPool) start(parallelism int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.InfoS("starting the NFD master node updater pool", "parallelism", parallelism)
|
klog.InfoS("starting the NFD master node updater pool", "parallelism", parallelism)
|
||||||
u.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
|
||||||
|
// Create ratelimiter. Mimic workqueue.DefaultControllerRateLimiter() but
|
||||||
|
// with modified per-item (node) rate limiting parameters.
|
||||||
|
rl := workqueue.NewMaxOfRateLimiter(
|
||||||
|
workqueue.NewItemExponentialFailureRateLimiter(50*time.Millisecond, 100*time.Second),
|
||||||
|
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||||
|
)
|
||||||
|
u.queue = workqueue.NewRateLimitingQueue(rl)
|
||||||
|
|
||||||
for i := 0; i < parallelism; i++ {
|
for i := 0; i < parallelism; i++ {
|
||||||
u.wg.Add(1)
|
u.wg.Add(1)
|
||||||
|
|
|
@ -39,7 +39,7 @@ type k8sAnnotations map[string]string
|
||||||
func eventuallyNonControlPlaneNodes(ctx context.Context, cli clientset.Interface) AsyncAssertion {
|
func eventuallyNonControlPlaneNodes(ctx context.Context, cli clientset.Interface) AsyncAssertion {
|
||||||
return Eventually(func(g Gomega, ctx context.Context) ([]corev1.Node, error) {
|
return Eventually(func(g Gomega, ctx context.Context) ([]corev1.Node, error) {
|
||||||
return getNonControlPlaneNodes(ctx, cli)
|
return getNonControlPlaneNodes(ctx, cli)
|
||||||
}).WithPolling(1 * time.Second).WithTimeout(10 * time.Second).WithContext(ctx)
|
}).WithPolling(1 * time.Second).WithTimeout(20 * time.Second).WithContext(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MatchLabels returns a specialized Gomega matcher for checking if a list of
|
// MatchLabels returns a specialized Gomega matcher for checking if a list of
|
||||||
|
|
|
@ -795,6 +795,7 @@ core:
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Verfiying node status capacity from NodeFeatureRules #4")
|
By("Verfiying node status capacity from NodeFeatureRules #4")
|
||||||
|
expectedCapacity = map[string]corev1.ResourceList{"*": {}}
|
||||||
eventuallyNonControlPlaneNodes(ctx, f.ClientSet).Should(MatchCapacity(expectedCapacity, nodes, false))
|
eventuallyNonControlPlaneNodes(ctx, f.ClientSet).Should(MatchCapacity(expectedCapacity, nodes, false))
|
||||||
|
|
||||||
By("Deleting nfd-worker daemonset")
|
By("Deleting nfd-worker daemonset")
|
||||||
|
|
Loading…
Add table
Reference in a new issue