mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2024-12-14 11:57:51 +00:00
Merge pull request #1133 from AhmedGrati/feat-parallelize-nodes-update
feat: parallelize nodes update
This commit is contained in:
commit
306969a945
11 changed files with 305 additions and 43 deletions
3
Makefile
3
Makefile
|
@ -239,3 +239,6 @@ site-build:
|
|||
site-serve:
|
||||
@mkdir -p docs/vendor/bundle
|
||||
$(SITE_BUILD_CMD) sh -c "bundle install && jekyll serve $(JEKYLL_OPTS) -H 127.0.0.1"
|
||||
|
||||
benchmark:
|
||||
go test -bench=./pkg/nfd-master -run=^# ./pkg/nfd-master
|
||||
|
|
|
@ -70,6 +70,8 @@ func main() {
|
|||
args.Overrides.NoPublish = overrides.NoPublish
|
||||
case "resync-period":
|
||||
args.Overrides.ResyncPeriod = overrides.ResyncPeriod
|
||||
case "nfd-api-parallelism":
|
||||
args.Overrides.NfdApiParallelism = overrides.NfdApiParallelism
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -156,5 +158,8 @@ func initFlags(flagset *flag.FlagSet) (*master.Args, *master.ConfigOverrideArgs)
|
|||
flagset.Var(overrides.ResyncPeriod, "resync-period",
|
||||
"Specify the NFD API controller resync period."+
|
||||
"It has an effect when the NodeFeature API has been enabled (with -enable-nodefeature-api).")
|
||||
overrides.NfdApiParallelism = flagset.Int("nfd-api-parallelism", 10, "Defines the maximum number of goroutines responsible of updating nodes. "+
|
||||
"Can be used for the throttling mechanism. It has effect only when -enable-nodefeature-api has been set.")
|
||||
|
||||
return args, overrides
|
||||
}
|
||||
|
|
|
@ -11,3 +11,4 @@
|
|||
# renewDeadline: 10s
|
||||
# # this value has to be greater than 0
|
||||
# retryPeriod: 2s
|
||||
# nfdApiParallelism: 10
|
||||
|
|
|
@ -110,6 +110,9 @@ spec:
|
|||
{{- if .Values.master.resyncPeriod }}
|
||||
- "-resync-period={{ .Values.master.resyncPeriod }}"
|
||||
{{- end }}
|
||||
{{- if .Values.master.nfdApiParallelism | empty | not }}
|
||||
- "-nfd-api-parallelism={{ .Values.master.nfdApiParallelism }}"
|
||||
{{- end }}
|
||||
{{- if .Values.tls.enable }}
|
||||
- "-ca-file=/etc/kubernetes/node-feature-discovery/certs/ca.crt"
|
||||
- "-key-file=/etc/kubernetes/node-feature-discovery/certs/tls.key"
|
||||
|
|
|
@ -27,6 +27,7 @@ master:
|
|||
# renewDeadline: 10s
|
||||
# # this value has to be greater than 0
|
||||
# retryPeriod: 2s
|
||||
# nfdApiParallelism: 10
|
||||
### <NFD-MASTER-CONF-END-DO-NOT-REMOVE>
|
||||
# The TCP port that nfd-master listens for incoming requests. Default: 8080
|
||||
port: 8080
|
||||
|
@ -39,6 +40,7 @@ master:
|
|||
enableTaints: false
|
||||
crdController: null
|
||||
featureRulesController: null
|
||||
nfdApiParallelism: null
|
||||
deploymentAnnotations: {}
|
||||
replicaCount: 1
|
||||
|
||||
|
|
|
@ -131,6 +131,7 @@ We have introduced the following Chart parameters.
|
|||
| `master.annotations` | dict | {} | NFD master pod [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
|
||||
| `master.affinity` | dict | | NFD master pod required [node affinity](https://kubernetes.io/docs/tasks/configure-pod-container/assign-pods-nodes-using-node-affinity/) |
|
||||
| `master.deploymentAnnotations` | dict | {} | NFD master deployment [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
|
||||
| `master.nfdApiParallelism` | integer | 10 | Specifies the maximum number of concurrent node updates. |
|
||||
| `master.config` | dict | | NFD master [configuration](../reference/master-configuration-reference) |
|
||||
|
||||
### Worker pod parameters
|
||||
|
|
|
@ -313,6 +313,21 @@ Example:
|
|||
nfd-master -options='{"noPublish": true}'
|
||||
```
|
||||
|
||||
### -nfd-api-parallelism
|
||||
|
||||
The `-nfd-api-parallelism` flag can be used to specify the maximum
|
||||
number of concurrent node updates.
|
||||
|
||||
It takes effect only when `-enable-nodefeature-api` has been set.
|
||||
|
||||
Default: 10
|
||||
|
||||
Example:
|
||||
|
||||
```bash
|
||||
nfd-master -nfd-api-parallelism=1
|
||||
```
|
||||
|
||||
### Logging
|
||||
|
||||
The following logging-related flags are inherited from the
|
||||
|
|
|
@ -189,3 +189,18 @@ Example:
|
|||
leaderElection:
|
||||
retryPeriod: 2s
|
||||
```
|
||||
|
||||
### nfdApiParallelism
|
||||
|
||||
The `nfdApiParallelism` option can be used to specify the maximum
|
||||
number of concurrent node updates.
|
||||
|
||||
It takes effect only when `-enable-nodefeature-api` has been set.
|
||||
|
||||
Default: 10
|
||||
|
||||
Example:
|
||||
|
||||
```yaml
|
||||
nfdApiParallelism: 1
|
||||
```
|
||||
|
|
|
@ -34,10 +34,15 @@ import (
|
|||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
k8sclient "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1"
|
||||
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned/fake"
|
||||
nfdscheme "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned/scheme"
|
||||
nfdinformers "sigs.k8s.io/node-feature-discovery/pkg/generated/informers/externalversions"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/labeler"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/utils"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/version"
|
||||
|
@ -57,6 +62,60 @@ func newMockNode() *corev1.Node {
|
|||
return &n
|
||||
}
|
||||
|
||||
func mockNodeList() *corev1.NodeList {
|
||||
l := corev1.NodeList{}
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
n := corev1.Node{}
|
||||
n.Name = fmt.Sprintf("node %v", i)
|
||||
n.Labels = map[string]string{}
|
||||
n.Annotations = map[string]string{}
|
||||
n.Status.Capacity = corev1.ResourceList{}
|
||||
|
||||
l.Items = append(l.Items, n)
|
||||
}
|
||||
return &l
|
||||
}
|
||||
|
||||
func newMockNfdAPIController(client *fake.Clientset) *nfdController {
|
||||
c := &nfdController{
|
||||
stopChan: make(chan struct{}, 1),
|
||||
updateAllNodesChan: make(chan struct{}, 1),
|
||||
updateOneNodeChan: make(chan string),
|
||||
}
|
||||
|
||||
informerFactory := nfdinformers.NewSharedInformerFactory(client, 1*time.Hour)
|
||||
|
||||
// Add informer for NodeFeature objects
|
||||
featureInformer := informerFactory.Nfd().V1alpha1().NodeFeatures()
|
||||
if _, err := featureInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {},
|
||||
DeleteFunc: func(obj interface{}) {},
|
||||
}); err != nil {
|
||||
return nil
|
||||
}
|
||||
c.featureLister = featureInformer.Lister()
|
||||
|
||||
// Add informer for NodeFeatureRule objects
|
||||
ruleInformer := informerFactory.Nfd().V1alpha1().NodeFeatureRules()
|
||||
if _, err := ruleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(object interface{}) {},
|
||||
UpdateFunc: func(oldObject, newObject interface{}) {},
|
||||
DeleteFunc: func(object interface{}) {},
|
||||
}); err != nil {
|
||||
return nil
|
||||
}
|
||||
c.ruleLister = ruleInformer.Lister()
|
||||
|
||||
// Start informers
|
||||
informerFactory.Start(c.stopChan)
|
||||
|
||||
utilruntime.Must(nfdv1alpha1.AddToScheme(nfdscheme.Scheme))
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func newMockMaster(apihelper apihelper.APIHelpers) *nfdMaster {
|
||||
return &nfdMaster{
|
||||
nodeName: mockNodeName,
|
||||
|
@ -676,11 +735,14 @@ extraLabelNs: ["added.ns.io"]
|
|||
writeConfig(`
|
||||
extraLabelNs: ["override.ns.io"]
|
||||
resyncPeriod: '2h'
|
||||
nfdApiParallelism: 300
|
||||
`)
|
||||
So(func() interface{} { return master.config.ExtraLabelNs },
|
||||
withTimeout, 2*time.Second, ShouldResemble, utils.StringSetVal{"override.ns.io": struct{}{}})
|
||||
So(func() interface{} { return master.config.ResyncPeriod.Duration },
|
||||
withTimeout, 2*time.Second, ShouldResemble, time.Duration(2)*time.Hour)
|
||||
So(func() interface{} { return master.config.NfdApiParallelism },
|
||||
withTimeout, 2*time.Second, ShouldResemble, 300)
|
||||
|
||||
// Removing config file should get back our defaults
|
||||
err = os.RemoveAll(tmpDir)
|
||||
|
@ -689,6 +751,8 @@ resyncPeriod: '2h'
|
|||
withTimeout, 2*time.Second, ShouldResemble, utils.StringSetVal{})
|
||||
So(func() interface{} { return master.config.ResyncPeriod.Duration },
|
||||
withTimeout, 2*time.Second, ShouldResemble, time.Duration(1)*time.Hour)
|
||||
So(func() interface{} { return master.config.NfdApiParallelism },
|
||||
withTimeout, 2*time.Second, ShouldResemble, 10)
|
||||
|
||||
// Re-creating config dir and file should change the config
|
||||
err = os.MkdirAll(configDir, 0755)
|
||||
|
@ -696,15 +760,56 @@ resyncPeriod: '2h'
|
|||
writeConfig(`
|
||||
extraLabelNs: ["another.override.ns"]
|
||||
resyncPeriod: '3m'
|
||||
nfdApiParallelism: 100
|
||||
`)
|
||||
So(func() interface{} { return master.config.ExtraLabelNs },
|
||||
withTimeout, 2*time.Second, ShouldResemble, utils.StringSetVal{"another.override.ns": struct{}{}})
|
||||
So(func() interface{} { return master.config.ResyncPeriod.Duration },
|
||||
withTimeout, 2*time.Second, ShouldResemble, time.Duration(3)*time.Minute)
|
||||
So(func() interface{} { return master.config.NfdApiParallelism },
|
||||
withTimeout, 2*time.Second, ShouldResemble, 100)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkNfdAPIUpdateAllNodes(b *testing.B) {
|
||||
mockAPIHelper := new(apihelper.MockAPIHelpers)
|
||||
|
||||
mockMaster := newMockMaster(mockAPIHelper)
|
||||
mockMaster.nfdController = newMockNfdAPIController(fake.NewSimpleClientset())
|
||||
mockMaster.config.NoPublish = true
|
||||
|
||||
mockNodeUpdaterPool := newNodeUpdaterPool(mockMaster)
|
||||
mockMaster.nodeUpdaterPool = mockNodeUpdaterPool
|
||||
|
||||
mockClient := &k8sclient.Clientset{}
|
||||
|
||||
statusPatches := []apihelper.JsonPatch{}
|
||||
metadataPatches := []apihelper.JsonPatch{
|
||||
{Op: "add", Path: "/metadata/annotations/nfd.node.kubernetes.io~1feature-labels", Value: ""}, {Op: "add", Path: "/metadata/annotations/nfd.node.kubernetes.io~1extended-resources", Value: ""},
|
||||
}
|
||||
|
||||
mockAPIHelper.On("GetClient").Return(mockClient, nil)
|
||||
mockAPIHelper.On("GetNodes", mockClient).Return(mockNodeList(), nil)
|
||||
|
||||
mockNodeUpdaterPool.start(10)
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
nodeName := fmt.Sprintf("node %v", i)
|
||||
node := corev1.Node{}
|
||||
node.Name = nodeName
|
||||
mockAPIHelper.On("GetNode", mockClient, nodeName).Return(&node, nil)
|
||||
mockAPIHelper.On("PatchNodeStatus", mockClient, nodeName, mock.MatchedBy(jsonPatchMatcher(statusPatches))).Return(nil)
|
||||
mockAPIHelper.On("PatchNode", mockClient, nodeName, mock.MatchedBy(jsonPatchMatcher(metadataPatches))).Return(nil)
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = mockMaster.nfdAPIUpdateAllNodes()
|
||||
}
|
||||
fmt.Println(b.Elapsed())
|
||||
}
|
||||
|
||||
// withTimeout is a custom assertion for polling a value asynchronously
|
||||
// actual is a function for getting the actual value
|
||||
// expected[0] is a time.Duration value specifying the timeout
|
||||
|
|
|
@ -69,14 +69,15 @@ type Annotations map[string]string
|
|||
|
||||
// NFDConfig contains the configuration settings of NfdMaster.
|
||||
type NFDConfig struct {
|
||||
DenyLabelNs utils.StringSetVal
|
||||
ExtraLabelNs utils.StringSetVal
|
||||
LabelWhiteList utils.RegexpVal
|
||||
NoPublish bool
|
||||
ResourceLabels utils.StringSetVal
|
||||
EnableTaints bool
|
||||
ResyncPeriod utils.DurationVal
|
||||
LeaderElection LeaderElectionConfig
|
||||
DenyLabelNs utils.StringSetVal
|
||||
ExtraLabelNs utils.StringSetVal
|
||||
LabelWhiteList utils.RegexpVal
|
||||
NoPublish bool
|
||||
ResourceLabels utils.StringSetVal
|
||||
EnableTaints bool
|
||||
ResyncPeriod utils.DurationVal
|
||||
LeaderElection LeaderElectionConfig
|
||||
NfdApiParallelism int
|
||||
}
|
||||
|
||||
// LeaderElectionConfig contains the configuration for leader election
|
||||
|
@ -88,13 +89,14 @@ type LeaderElectionConfig struct {
|
|||
|
||||
// ConfigOverrideArgs are args that override config file options
|
||||
type ConfigOverrideArgs struct {
|
||||
DenyLabelNs *utils.StringSetVal
|
||||
ExtraLabelNs *utils.StringSetVal
|
||||
LabelWhiteList *utils.RegexpVal
|
||||
ResourceLabels *utils.StringSetVal
|
||||
EnableTaints *bool
|
||||
NoPublish *bool
|
||||
ResyncPeriod *utils.DurationVal
|
||||
DenyLabelNs *utils.StringSetVal
|
||||
ExtraLabelNs *utils.StringSetVal
|
||||
LabelWhiteList *utils.RegexpVal
|
||||
ResourceLabels *utils.StringSetVal
|
||||
EnableTaints *bool
|
||||
NoPublish *bool
|
||||
ResyncPeriod *utils.DurationVal
|
||||
NfdApiParallelism *int
|
||||
}
|
||||
|
||||
// Args holds command line arguments
|
||||
|
@ -130,15 +132,16 @@ type NfdMaster interface {
|
|||
type nfdMaster struct {
|
||||
*nfdController
|
||||
|
||||
args Args
|
||||
namespace string
|
||||
nodeName string
|
||||
configFilePath string
|
||||
server *grpc.Server
|
||||
stop chan struct{}
|
||||
ready chan bool
|
||||
apihelper apihelper.APIHelpers
|
||||
kubeconfig *restclient.Config
|
||||
args Args
|
||||
namespace string
|
||||
nodeName string
|
||||
configFilePath string
|
||||
server *grpc.Server
|
||||
stop chan struct{}
|
||||
ready chan bool
|
||||
apihelper apihelper.APIHelpers
|
||||
kubeconfig *restclient.Config
|
||||
nodeUpdaterPool *nodeUpdaterPool
|
||||
deniedNs
|
||||
config *NFDConfig
|
||||
}
|
||||
|
@ -176,18 +179,22 @@ func NewNfdMaster(args *Args) (NfdMaster, error) {
|
|||
if args.ConfigFile != "" {
|
||||
nfd.configFilePath = filepath.Clean(args.ConfigFile)
|
||||
}
|
||||
|
||||
nfd.nodeUpdaterPool = newNodeUpdaterPool(nfd)
|
||||
|
||||
return nfd, nil
|
||||
}
|
||||
|
||||
func newDefaultConfig() *NFDConfig {
|
||||
return &NFDConfig{
|
||||
LabelWhiteList: utils.RegexpVal{Regexp: *regexp.MustCompile("")},
|
||||
DenyLabelNs: utils.StringSetVal{},
|
||||
ExtraLabelNs: utils.StringSetVal{},
|
||||
NoPublish: false,
|
||||
ResourceLabels: utils.StringSetVal{},
|
||||
EnableTaints: false,
|
||||
ResyncPeriod: utils.DurationVal{Duration: time.Duration(1) * time.Hour},
|
||||
LabelWhiteList: utils.RegexpVal{Regexp: *regexp.MustCompile("")},
|
||||
DenyLabelNs: utils.StringSetVal{},
|
||||
ExtraLabelNs: utils.StringSetVal{},
|
||||
NoPublish: false,
|
||||
NfdApiParallelism: 10,
|
||||
ResourceLabels: utils.StringSetVal{},
|
||||
EnableTaints: false,
|
||||
ResyncPeriod: utils.DurationVal{Duration: time.Duration(1) * time.Hour},
|
||||
LeaderElection: LeaderElectionConfig{
|
||||
LeaseDuration: utils.DurationVal{Duration: time.Duration(15) * time.Second},
|
||||
RetryPeriod: utils.DurationVal{Duration: time.Duration(2) * time.Second},
|
||||
|
@ -220,6 +227,8 @@ func (m *nfdMaster) Run() error {
|
|||
}
|
||||
}
|
||||
|
||||
m.nodeUpdaterPool.start(m.config.NfdApiParallelism)
|
||||
|
||||
// Create watcher for config file
|
||||
configWatch, err := utils.CreateFsWatcher(time.Second, m.configFilePath)
|
||||
if err != nil {
|
||||
|
@ -276,6 +285,10 @@ func (m *nfdMaster) Run() error {
|
|||
if m.nfdController != nil && m.args.EnableNodeFeatureApi {
|
||||
m.nfdController.updateAllNodesChan <- struct{}{}
|
||||
}
|
||||
// Restart the node updater pool
|
||||
m.nodeUpdaterPool.stop()
|
||||
m.nodeUpdaterPool.start(m.config.NfdApiParallelism)
|
||||
|
||||
case <-m.stop:
|
||||
klog.InfoS("shutting down nfd-master")
|
||||
return nil
|
||||
|
@ -359,10 +372,7 @@ func (m *nfdMaster) nfdAPIUpdateHandler() {
|
|||
case nodeName := <-m.nfdController.updateOneNodeChan:
|
||||
updateNodes[nodeName] = struct{}{}
|
||||
case <-rateLimit:
|
||||
// Check what we need to do
|
||||
// TODO: we might want to update multiple nodes in parallel
|
||||
errUpdateAll := false
|
||||
errNodes := make(map[string]struct{})
|
||||
if updateAll {
|
||||
if err := m.nfdAPIUpdateAllNodes(); err != nil {
|
||||
klog.ErrorS(err, "failed to update nodes")
|
||||
|
@ -370,16 +380,13 @@ func (m *nfdMaster) nfdAPIUpdateHandler() {
|
|||
}
|
||||
} else {
|
||||
for nodeName := range updateNodes {
|
||||
if err := m.nfdAPIUpdateOneNode(nodeName); err != nil {
|
||||
klog.ErrorS(err, "failed to update node", "nodeName", nodeName)
|
||||
errNodes[nodeName] = struct{}{}
|
||||
}
|
||||
m.nodeUpdaterPool.queue.Add(nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset "work queue" and timer, will cause re-try if errors happened
|
||||
// Reset "work queue" and timer
|
||||
updateAll = errUpdateAll
|
||||
updateNodes = errNodes
|
||||
updateNodes = map[string]struct{}{}
|
||||
rateLimit = time.After(time.Second)
|
||||
}
|
||||
}
|
||||
|
@ -393,6 +400,8 @@ func (m *nfdMaster) Stop() {
|
|||
m.nfdController.stop()
|
||||
}
|
||||
|
||||
m.nodeUpdaterPool.stop()
|
||||
|
||||
close(m.stop)
|
||||
}
|
||||
|
||||
|
@ -677,9 +686,7 @@ func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
|
|||
}
|
||||
|
||||
for _, node := range nodes.Items {
|
||||
if err := m.nfdAPIUpdateOneNode(node.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
m.nodeUpdaterPool.queue.Add(node.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -1182,6 +1189,13 @@ func (m *nfdMaster) configure(filepath string, overrides string) error {
|
|||
if m.args.Overrides.ResyncPeriod != nil {
|
||||
c.ResyncPeriod = *m.args.Overrides.ResyncPeriod
|
||||
}
|
||||
if m.args.Overrides.NfdApiParallelism != nil {
|
||||
c.NfdApiParallelism = *m.args.Overrides.NfdApiParallelism
|
||||
}
|
||||
|
||||
if c.NfdApiParallelism <= 0 {
|
||||
return fmt.Errorf("the maximum number of concurrent labelers should be a non-zero positive number")
|
||||
}
|
||||
|
||||
m.config = c
|
||||
if !c.NoPublish {
|
||||
|
|
98
pkg/nfd-master/node-updater-pool.go
Normal file
98
pkg/nfd-master/node-updater-pool.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package nfdmaster
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type nodeUpdaterPool struct {
|
||||
queue workqueue.RateLimitingInterface
|
||||
sync.Mutex
|
||||
|
||||
wg sync.WaitGroup
|
||||
nfdMaster *nfdMaster
|
||||
}
|
||||
|
||||
func newNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {
|
||||
return &nodeUpdaterPool{
|
||||
nfdMaster: nfdMaster,
|
||||
wg: sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
func (u *nodeUpdaterPool) processNodeLabelRequest(queue workqueue.RateLimitingInterface) bool {
|
||||
nodeName, quit := queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
||||
defer queue.Done(nodeName)
|
||||
|
||||
if err := u.nfdMaster.nfdAPIUpdateOneNode(nodeName.(string)); err != nil {
|
||||
if queue.NumRequeues(nodeName) < 5 {
|
||||
klog.InfoS("retrying labeling request for node", "nodeName", nodeName)
|
||||
queue.AddRateLimited(nodeName)
|
||||
return true
|
||||
} else {
|
||||
klog.ErrorS(err, "error labeling node", "nodeName", nodeName)
|
||||
}
|
||||
}
|
||||
queue.Forget(nodeName)
|
||||
return true
|
||||
}
|
||||
|
||||
func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
|
||||
for u.processNodeLabelRequest(queue) {
|
||||
}
|
||||
u.wg.Done()
|
||||
}
|
||||
|
||||
func (u *nodeUpdaterPool) start(parallelism int) {
|
||||
u.Lock()
|
||||
defer u.Unlock()
|
||||
|
||||
if u.queue != nil && !u.queue.ShuttingDown() {
|
||||
klog.InfoS("the NFD master node updater pool is already running.")
|
||||
return
|
||||
}
|
||||
|
||||
klog.InfoS("starting the NFD master node updater pool", "parallelism", parallelism)
|
||||
u.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
||||
|
||||
for i := 0; i < parallelism; i++ {
|
||||
u.wg.Add(1)
|
||||
go u.runNodeUpdater(u.queue)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *nodeUpdaterPool) stop() {
|
||||
u.Lock()
|
||||
defer u.Unlock()
|
||||
|
||||
if u.queue == nil || u.queue.ShuttingDown() {
|
||||
klog.InfoS("the NFD master node updater pool is not running.")
|
||||
return
|
||||
}
|
||||
|
||||
klog.InfoS("stopping the NFD master node updater pool")
|
||||
u.queue.ShutDown()
|
||||
u.wg.Wait()
|
||||
}
|
Loading…
Reference in a new issue