1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-28 02:37:11 +00:00

topology-updater: update CRs when notified

When a message received via the channel,
the main loop updates the `NodeResourceTopology` objects.

The notifier will send a message via the channel if:
1. It reached the sleep timeout.
2. It detected a change in Kubelet state files

Signed-off-by: Talor Itzhak <titzhak@redhat.com>
This commit is contained in:
Talor Itzhak 2023-01-08 16:45:32 +02:00
parent 175e0c81aa
commit 7b248ecae2
3 changed files with 21 additions and 11 deletions

View file

@ -91,7 +91,10 @@ func main() {
}
// Get new TopologyUpdater instance
instance := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
if err != nil {
klog.Exit(err)
}
if err = instance.Run(); err != nil {
klog.Exit(err)

View file

@ -50,8 +50,8 @@ type Info struct {
Event EventType
}
func New(sleepInterval time.Duration, dest chan<- Info, kubeletDirPath string) (*Notifier, error) {
ch, err := createFSWatcherEvent([]string{kubeletDirPath})
func New(sleepInterval time.Duration, dest chan<- Info, kubeletStateDir string) (*Notifier, error) {
ch, err := createFSWatcherEvent([]string{kubeletStateDir})
if err != nil {
return nil, err
}

View file

@ -20,16 +20,16 @@ import (
"fmt"
"os"
"path/filepath"
"time"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"golang.org/x/net/context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater/kubeletnotifier"
"sigs.k8s.io/node-feature-discovery/pkg/podres"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
@ -88,23 +88,31 @@ type nfdTopologyUpdater struct {
apihelper apihelper.APIHelpers
resourcemonitorArgs resourcemonitor.Args
stop chan struct{} // channel for signaling stop
eventSource <-chan kubeletnotifier.Info
configFilePath string
config *NFDConfig
}
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) NfdTopologyUpdater {
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) {
eventSource := make(chan kubeletnotifier.Info)
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
if err != nil {
return nil, err
}
go ntf.Run()
nfd := &nfdTopologyUpdater{
args: args,
resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: newStaticNodeInfo(policy, scope),
stop: make(chan struct{}, 1),
eventSource: eventSource,
config: &NFDConfig{},
}
if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
}
return nfd
return nfd, nil
}
// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
@ -150,10 +158,9 @@ func (w *nfdTopologyUpdater) Run() error {
klog.V(2).Infof("resAggr is: %v\n", resAggr)
crTrigger := time.NewTicker(w.resourcemonitorArgs.SleepInterval)
for {
select {
case <-crTrigger.C:
case <-w.eventSource:
klog.Infof("Scanning")
scanResponse, err := resScan.Scan()
utils.KlogDump(1, "podResources are", " ", scanResponse.PodResources)