diff --git a/cmd/nfd-topology-updater/main.go b/cmd/nfd-topology-updater/main.go index bdb9b7c92..0b9bef9e8 100644 --- a/cmd/nfd-topology-updater/main.go +++ b/cmd/nfd-topology-updater/main.go @@ -155,6 +155,8 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) { "NFD server address to connecto to.") flagset.StringVar(&args.ServerNameOverride, "server-name-override", "", "Hostname expected from server certificate, useful in testing") + flagset.StringVar(&args.ConfigFile, "config", "/etc/kubernetes/node-feature-discovery/nfd-topology-updater.conf", + "Config file to use.") klog.InitFlags(flagset) diff --git a/cmd/nfd-topology-updater/main_test.go b/cmd/nfd-topology-updater/main_test.go index e1b249808..dd2ba86e0 100644 --- a/cmd/nfd-topology-updater/main_test.go +++ b/cmd/nfd-topology-updater/main_test.go @@ -34,19 +34,22 @@ func TestArgsParse(t *testing.T) { Convey("noPublish is set and args.sources is set to the default value", func() { So(args.NoPublish, ShouldBeTrue) So(args.Oneshot, ShouldBeTrue) + So(args.ConfigFile, ShouldEqual, "/etc/kubernetes/node-feature-discovery/nfd-topology-updater.conf") So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second) So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock") }) }) - Convey("When valid args are specified for -kubelet-config-url and -sleep-interval,", func() { + Convey("When valid args are specified for -kubelet-config-url, -sleep-interval and -config,", func() { args, finderArgs := parseArgs(flags, "-kubelet-config-uri=file:///path/testconfig.yaml", - "-sleep-interval=30s") + "-sleep-interval=30s", + "-config=/path/nfd-topology-updater.conf") Convey("args.sources is set to appropriate values", func() { So(args.NoPublish, ShouldBeFalse) So(args.Oneshot, ShouldBeFalse) + So(args.ConfigFile, ShouldEqual, "/path/nfd-topology-updater.conf") So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second) So(finderArgs.KubeletConfigURI, ShouldEqual, "file:///path/testconfig.yaml") So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock") diff --git a/pkg/nfd-client/topology-updater/nfd-topology-updater.go b/pkg/nfd-client/topology-updater/nfd-topology-updater.go index baa8f378e..1853536e2 100644 --- a/pkg/nfd-client/topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-client/topology-updater/nfd-topology-updater.go @@ -18,6 +18,8 @@ package topologyupdater import ( "fmt" + "os" + "path/filepath" "time" "k8s.io/klog/v2" @@ -32,6 +34,7 @@ import ( pb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/version" + "sigs.k8s.io/yaml" ) // Args are the command line arguments @@ -40,6 +43,12 @@ type Args struct { NoPublish bool Oneshot bool KubeConfigFile string + ConfigFile string +} + +// NFDConfig contains the configuration settings of NFDTopologyUpdater. +type NFDConfig struct { + ExcludeList map[string][]string } type NfdTopologyUpdater interface { @@ -59,6 +68,8 @@ type nfdTopologyUpdater struct { certWatch *utils.FsWatcher client pb.NodeTopologyClient stop chan struct{} // channel for signaling stop + configFilePath string + config *NFDConfig } // NewTopologyUpdater creates a new NfdTopologyUpdater instance. @@ -75,7 +86,11 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol nodeInfo: &staticNodeInfo{ tmPolicy: policy, }, - stop: make(chan struct{}, 1), + stop: make(chan struct{}, 1), + config: &NFDConfig{}, + } + if args.ConfigFile != "" { + nfd.configFilePath = filepath.Clean(args.ConfigFile) } return nfd, nil } @@ -99,6 +114,9 @@ func (w *nfdTopologyUpdater) Run() error { } kubeApihelper = apihelper.K8sHelpers{Kubeconfig: kubeconfig} } + if err := w.configure(); err != nil { + return fmt.Errorf("faild to configure Node Feature Discovery Topology Updater: %w", err) + } var resScan resourcemonitor.ResourcesScanner @@ -113,7 +131,8 @@ func (w *nfdTopologyUpdater) Run() error { // zonesChannel := make(chan v1alpha1.ZoneList) var zones v1alpha1.ZoneList - resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient) + excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, nfdclient.NodeName()) + resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList) if err != nil { return fmt.Errorf("failed to obtain node resource information: %w", err) } @@ -245,3 +264,27 @@ func advertiseNodeTopology(client pb.NodeTopologyClient, zoneInfo v1alpha1.ZoneL return nil } + +func (w *nfdTopologyUpdater) configure() error { + if w.configFilePath == "" { + klog.Warningf("file path for nfd-topology-updater conf file is empty") + return nil + } + + b, err := os.ReadFile(w.configFilePath) + if err != nil { + // config is optional + if os.IsNotExist(err) { + klog.Warningf("couldn't find conf file under %v", w.configFilePath) + return nil + } + return err + } + + err = yaml.Unmarshal(b, w.config) + if err != nil { + return fmt.Errorf("failed to parse configuration file %q: %w", w.configFilePath, err) + } + klog.Infof("configuration file %q parsed:\n %v", w.configFilePath, w.config) + return nil +} diff --git a/pkg/resourcemonitor/excludelist.go b/pkg/resourcemonitor/excludelist.go new file mode 100644 index 000000000..444042f46 --- /dev/null +++ b/pkg/resourcemonitor/excludelist.go @@ -0,0 +1,33 @@ +package resourcemonitor + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" +) + +// ExcludeResourceList contains a list of resources to ignore during resources scan +type ExcludeResourceList struct { + excludeList sets.String +} + +// NewExcludeResourceList returns new ExcludeList with values with set.String types +func NewExcludeResourceList(resMap map[string][]string, nodeName string) ExcludeResourceList { + excludeList := make(sets.String) + for k, v := range resMap { + if k == nodeName || k == "*" { + excludeList.Insert(v...) + } + } + return ExcludeResourceList{ + excludeList: excludeList, + } +} + +func (rl *ExcludeResourceList) IsExcluded(resource corev1.ResourceName) bool { + if rl.excludeList.Has(string(resource)) { + klog.V(5).InfoS("resource excluded", "resource", resource) + return true + } + return false +} diff --git a/pkg/resourcemonitor/excludelist_test.go b/pkg/resourcemonitor/excludelist_test.go new file mode 100644 index 000000000..092d1da42 --- /dev/null +++ b/pkg/resourcemonitor/excludelist_test.go @@ -0,0 +1,70 @@ +package resourcemonitor + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" +) + +const ( + cpu = string(corev1.ResourceCPU) + memory = string(corev1.ResourceMemory) + hugepages2Mi = "hugepages-2Mi" + nicResourceName = "vendor/nic1" +) + +func TestNewExcludeResourceList(t *testing.T) { + tests := []struct { + desc string + excludeListConfig map[string][]string + nodeName string + expectedExcludedResources []string + }{ + { + + desc: "exclude list with multiple nodes", + excludeListConfig: map[string][]string{ + "node1": { + cpu, + nicResourceName, + }, + "node2": { + memory, + hugepages2Mi, + }, + }, + nodeName: "node1", + expectedExcludedResources: []string{cpu, nicResourceName}, + }, + { + desc: "exclude list with wild card", + excludeListConfig: map[string][]string{ + "*": { + memory, nicResourceName, + }, + "node1": { + cpu, + hugepages2Mi, + }, + }, + nodeName: "node2", + expectedExcludedResources: []string{memory, nicResourceName}, + }, + { + desc: "empty exclude list", + excludeListConfig: map[string][]string{}, + nodeName: "node1", + expectedExcludedResources: []string{}, + }, + } + + for _, tt := range tests { + t.Logf("test %s", tt.desc) + excludeList := NewExcludeResourceList(tt.excludeListConfig, tt.nodeName) + for _, res := range tt.expectedExcludedResources { + if !excludeList.IsExcluded(corev1.ResourceName(res)) { + t.Errorf("resource: %q expected to be excluded from node: %q", res, tt.nodeName) + } + } + } +} diff --git a/pkg/resourcemonitor/noderesourcesaggregator.go b/pkg/resourcemonitor/noderesourcesaggregator.go index 8b08c33ae..12d4676f2 100644 --- a/pkg/resourcemonitor/noderesourcesaggregator.go +++ b/pkg/resourcemonitor/noderesourcesaggregator.go @@ -28,8 +28,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" + podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath" ) @@ -46,6 +46,7 @@ type nodeResources struct { topo *ghw.TopologyInfo reservedCPUIDPerNUMA map[int][]string memoryResourcesCapacityPerNUMA utils.NumaMemoryResources + excludeList ExcludeResourceList } type resourceData struct { @@ -54,7 +55,7 @@ type resourceData struct { capacity int64 } -func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesListerClient) (ResourcesAggregator, error) { +func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesListerClient, excludeList ExcludeResourceList) (ResourcesAggregator, error) { var err error topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{ @@ -85,11 +86,11 @@ func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesLister return nil, fmt.Errorf("failed to get allocatable resources (ensure that KubeletPodResourcesGetAllocatable feature gate is enabled): %w", err) } - return NewResourcesAggregatorFromData(topo, resp, memoryResourcesCapacityPerNUMA), nil + return NewResourcesAggregatorFromData(topo, resp, memoryResourcesCapacityPerNUMA, excludeList), nil } // NewResourcesAggregatorFromData is used to aggregate resource information based on the received data from underlying hardware and podresource API -func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesapi.AllocatableResourcesResponse, memoryResourceCapacity utils.NumaMemoryResources) ResourcesAggregator { +func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesapi.AllocatableResourcesResponse, memoryResourceCapacity utils.NumaMemoryResources, excludeList ExcludeResourceList) ResourcesAggregator { allDevs := getContainerDevicesFromAllocatableResources(resp, topo) return &nodeResources{ topo: topo, @@ -97,6 +98,7 @@ func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesap perNUMAAllocatable: makeNodeAllocatable(allDevs, resp.GetMemory()), reservedCPUIDPerNUMA: makeReservedCPUMap(topo.Nodes, allDevs), memoryResourcesCapacityPerNUMA: memoryResourceCapacity, + excludeList: excludeList, } } @@ -108,6 +110,9 @@ func (noderesourceData *nodeResources) Aggregate(podResData []PodResources) topo if ok { perNuma[nodeID] = make(map[corev1.ResourceName]*resourceData) for resName, allocatable := range nodeRes { + if noderesourceData.excludeList.IsExcluded(resName) { + continue + } switch { case resName == "cpu": perNuma[nodeID][resName] = &resourceData{ diff --git a/pkg/resourcemonitor/noderesourcesaggregator_test.go b/pkg/resourcemonitor/noderesourcesaggregator_test.go index a5ddeca89..011c14de9 100644 --- a/pkg/resourcemonitor/noderesourcesaggregator_test.go +++ b/pkg/resourcemonitor/noderesourcesaggregator_test.go @@ -178,7 +178,7 @@ func TestResourcesAggregator(t *testing.T) { corev1.ResourceName("hugepages-2Mi"): 2048, }, } - resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity) + resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity, NewExcludeResourceList(map[string][]string{}, "")) Convey("When aggregating resources", func() { expected := topologyv1alpha1.ZoneList{ @@ -376,7 +376,7 @@ func TestResourcesAggregator(t *testing.T) { }, } - resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity) + resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity, NewExcludeResourceList(map[string][]string{}, "")) Convey("When aggregating resources", func() { podRes := []PodResources{