1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-18 22:33:09 +00:00

topology-updater: introduce exclude-list

The exclude-list allows to filter specific resource accounting
from NRT's objects per node basis.

The CRs created by the topology-updater are used by the scheduler-plugin
as a source of truth for making scheduling decisions.
As such, this feature allows to hide specific information
from the scheduler, which in turn
will affect the scheduling decision.
A common use case is when user would like to perform scheduling
decisions which are based on a specific resource.
In that case, we can exclude all the other resources
which we don't want the scheduler to exemine.

The exclude-list is provided to the topology-updater via a ConfigMap.
Resource type's names specified in the list should match the names
as shown here: https://pkg.go.dev/k8s.io/api/core/v1#ResourceName

This is a resurrection of an old work started here:
https://github.com/kubernetes-sigs/node-feature-discovery/pull/545

Signed-off-by: Talor Itzhak <titzhak@redhat.com>
This commit is contained in:
Talor Itzhak 2022-11-02 16:01:25 +02:00
parent 2c0b6f345f
commit 5b0788ced4
7 changed files with 166 additions and 10 deletions

View file

@ -155,6 +155,8 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
"NFD server address to connecto to.") "NFD server address to connecto to.")
flagset.StringVar(&args.ServerNameOverride, "server-name-override", "", flagset.StringVar(&args.ServerNameOverride, "server-name-override", "",
"Hostname expected from server certificate, useful in testing") "Hostname expected from server certificate, useful in testing")
flagset.StringVar(&args.ConfigFile, "config", "/etc/kubernetes/node-feature-discovery/nfd-topology-updater.conf",
"Config file to use.")
klog.InitFlags(flagset) klog.InitFlags(flagset)

View file

@ -34,19 +34,22 @@ func TestArgsParse(t *testing.T) {
Convey("noPublish is set and args.sources is set to the default value", func() { Convey("noPublish is set and args.sources is set to the default value", func() {
So(args.NoPublish, ShouldBeTrue) So(args.NoPublish, ShouldBeTrue)
So(args.Oneshot, ShouldBeTrue) So(args.Oneshot, ShouldBeTrue)
So(args.ConfigFile, ShouldEqual, "/etc/kubernetes/node-feature-discovery/nfd-topology-updater.conf")
So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second) So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second)
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock") So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
}) })
}) })
Convey("When valid args are specified for -kubelet-config-url and -sleep-interval,", func() { Convey("When valid args are specified for -kubelet-config-url, -sleep-interval and -config,", func() {
args, finderArgs := parseArgs(flags, args, finderArgs := parseArgs(flags,
"-kubelet-config-uri=file:///path/testconfig.yaml", "-kubelet-config-uri=file:///path/testconfig.yaml",
"-sleep-interval=30s") "-sleep-interval=30s",
"-config=/path/nfd-topology-updater.conf")
Convey("args.sources is set to appropriate values", func() { Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse) So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse) So(args.Oneshot, ShouldBeFalse)
So(args.ConfigFile, ShouldEqual, "/path/nfd-topology-updater.conf")
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second) So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigURI, ShouldEqual, "file:///path/testconfig.yaml") So(finderArgs.KubeletConfigURI, ShouldEqual, "file:///path/testconfig.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock") So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")

View file

@ -18,6 +18,8 @@ package topologyupdater
import ( import (
"fmt" "fmt"
"os"
"path/filepath"
"time" "time"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -32,6 +34,7 @@ import (
pb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater" pb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/yaml"
) )
// Args are the command line arguments // Args are the command line arguments
@ -40,6 +43,12 @@ type Args struct {
NoPublish bool NoPublish bool
Oneshot bool Oneshot bool
KubeConfigFile string KubeConfigFile string
ConfigFile string
}
// NFDConfig contains the configuration settings of NFDTopologyUpdater.
type NFDConfig struct {
ExcludeList map[string][]string
} }
type NfdTopologyUpdater interface { type NfdTopologyUpdater interface {
@ -59,6 +68,8 @@ type nfdTopologyUpdater struct {
certWatch *utils.FsWatcher certWatch *utils.FsWatcher
client pb.NodeTopologyClient client pb.NodeTopologyClient
stop chan struct{} // channel for signaling stop stop chan struct{} // channel for signaling stop
configFilePath string
config *NFDConfig
} }
// NewTopologyUpdater creates a new NfdTopologyUpdater instance. // NewTopologyUpdater creates a new NfdTopologyUpdater instance.
@ -76,6 +87,10 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
tmPolicy: policy, tmPolicy: policy,
}, },
stop: make(chan struct{}, 1), stop: make(chan struct{}, 1),
config: &NFDConfig{},
}
if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
} }
return nfd, nil return nfd, nil
} }
@ -99,6 +114,9 @@ func (w *nfdTopologyUpdater) Run() error {
} }
kubeApihelper = apihelper.K8sHelpers{Kubeconfig: kubeconfig} kubeApihelper = apihelper.K8sHelpers{Kubeconfig: kubeconfig}
} }
if err := w.configure(); err != nil {
return fmt.Errorf("faild to configure Node Feature Discovery Topology Updater: %w", err)
}
var resScan resourcemonitor.ResourcesScanner var resScan resourcemonitor.ResourcesScanner
@ -113,7 +131,8 @@ func (w *nfdTopologyUpdater) Run() error {
// zonesChannel := make(chan v1alpha1.ZoneList) // zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha1.ZoneList var zones v1alpha1.ZoneList
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient) excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, nfdclient.NodeName())
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList)
if err != nil { if err != nil {
return fmt.Errorf("failed to obtain node resource information: %w", err) return fmt.Errorf("failed to obtain node resource information: %w", err)
} }
@ -245,3 +264,27 @@ func advertiseNodeTopology(client pb.NodeTopologyClient, zoneInfo v1alpha1.ZoneL
return nil return nil
} }
func (w *nfdTopologyUpdater) configure() error {
if w.configFilePath == "" {
klog.Warningf("file path for nfd-topology-updater conf file is empty")
return nil
}
b, err := os.ReadFile(w.configFilePath)
if err != nil {
// config is optional
if os.IsNotExist(err) {
klog.Warningf("couldn't find conf file under %v", w.configFilePath)
return nil
}
return err
}
err = yaml.Unmarshal(b, w.config)
if err != nil {
return fmt.Errorf("failed to parse configuration file %q: %w", w.configFilePath, err)
}
klog.Infof("configuration file %q parsed:\n %v", w.configFilePath, w.config)
return nil
}

View file

@ -0,0 +1,33 @@
package resourcemonitor
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
// ExcludeResourceList contains a list of resources to ignore during resources scan
type ExcludeResourceList struct {
excludeList sets.String
}
// NewExcludeResourceList returns new ExcludeList with values with set.String types
func NewExcludeResourceList(resMap map[string][]string, nodeName string) ExcludeResourceList {
excludeList := make(sets.String)
for k, v := range resMap {
if k == nodeName || k == "*" {
excludeList.Insert(v...)
}
}
return ExcludeResourceList{
excludeList: excludeList,
}
}
func (rl *ExcludeResourceList) IsExcluded(resource corev1.ResourceName) bool {
if rl.excludeList.Has(string(resource)) {
klog.V(5).InfoS("resource excluded", "resource", resource)
return true
}
return false
}

View file

@ -0,0 +1,70 @@
package resourcemonitor
import (
"testing"
corev1 "k8s.io/api/core/v1"
)
const (
cpu = string(corev1.ResourceCPU)
memory = string(corev1.ResourceMemory)
hugepages2Mi = "hugepages-2Mi"
nicResourceName = "vendor/nic1"
)
func TestNewExcludeResourceList(t *testing.T) {
tests := []struct {
desc string
excludeListConfig map[string][]string
nodeName string
expectedExcludedResources []string
}{
{
desc: "exclude list with multiple nodes",
excludeListConfig: map[string][]string{
"node1": {
cpu,
nicResourceName,
},
"node2": {
memory,
hugepages2Mi,
},
},
nodeName: "node1",
expectedExcludedResources: []string{cpu, nicResourceName},
},
{
desc: "exclude list with wild card",
excludeListConfig: map[string][]string{
"*": {
memory, nicResourceName,
},
"node1": {
cpu,
hugepages2Mi,
},
},
nodeName: "node2",
expectedExcludedResources: []string{memory, nicResourceName},
},
{
desc: "empty exclude list",
excludeListConfig: map[string][]string{},
nodeName: "node1",
expectedExcludedResources: []string{},
},
}
for _, tt := range tests {
t.Logf("test %s", tt.desc)
excludeList := NewExcludeResourceList(tt.excludeListConfig, tt.nodeName)
for _, res := range tt.expectedExcludedResources {
if !excludeList.IsExcluded(corev1.ResourceName(res)) {
t.Errorf("resource: %q expected to be excluded from node: %q", res, tt.nodeName)
}
}
}
}

View file

@ -28,8 +28,8 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2" "k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath" "sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath"
) )
@ -46,6 +46,7 @@ type nodeResources struct {
topo *ghw.TopologyInfo topo *ghw.TopologyInfo
reservedCPUIDPerNUMA map[int][]string reservedCPUIDPerNUMA map[int][]string
memoryResourcesCapacityPerNUMA utils.NumaMemoryResources memoryResourcesCapacityPerNUMA utils.NumaMemoryResources
excludeList ExcludeResourceList
} }
type resourceData struct { type resourceData struct {
@ -54,7 +55,7 @@ type resourceData struct {
capacity int64 capacity int64
} }
func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesListerClient) (ResourcesAggregator, error) { func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesListerClient, excludeList ExcludeResourceList) (ResourcesAggregator, error) {
var err error var err error
topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{ topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{
@ -85,11 +86,11 @@ func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesLister
return nil, fmt.Errorf("failed to get allocatable resources (ensure that KubeletPodResourcesGetAllocatable feature gate is enabled): %w", err) return nil, fmt.Errorf("failed to get allocatable resources (ensure that KubeletPodResourcesGetAllocatable feature gate is enabled): %w", err)
} }
return NewResourcesAggregatorFromData(topo, resp, memoryResourcesCapacityPerNUMA), nil return NewResourcesAggregatorFromData(topo, resp, memoryResourcesCapacityPerNUMA, excludeList), nil
} }
// NewResourcesAggregatorFromData is used to aggregate resource information based on the received data from underlying hardware and podresource API // NewResourcesAggregatorFromData is used to aggregate resource information based on the received data from underlying hardware and podresource API
func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesapi.AllocatableResourcesResponse, memoryResourceCapacity utils.NumaMemoryResources) ResourcesAggregator { func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesapi.AllocatableResourcesResponse, memoryResourceCapacity utils.NumaMemoryResources, excludeList ExcludeResourceList) ResourcesAggregator {
allDevs := getContainerDevicesFromAllocatableResources(resp, topo) allDevs := getContainerDevicesFromAllocatableResources(resp, topo)
return &nodeResources{ return &nodeResources{
topo: topo, topo: topo,
@ -97,6 +98,7 @@ func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesap
perNUMAAllocatable: makeNodeAllocatable(allDevs, resp.GetMemory()), perNUMAAllocatable: makeNodeAllocatable(allDevs, resp.GetMemory()),
reservedCPUIDPerNUMA: makeReservedCPUMap(topo.Nodes, allDevs), reservedCPUIDPerNUMA: makeReservedCPUMap(topo.Nodes, allDevs),
memoryResourcesCapacityPerNUMA: memoryResourceCapacity, memoryResourcesCapacityPerNUMA: memoryResourceCapacity,
excludeList: excludeList,
} }
} }
@ -108,6 +110,9 @@ func (noderesourceData *nodeResources) Aggregate(podResData []PodResources) topo
if ok { if ok {
perNuma[nodeID] = make(map[corev1.ResourceName]*resourceData) perNuma[nodeID] = make(map[corev1.ResourceName]*resourceData)
for resName, allocatable := range nodeRes { for resName, allocatable := range nodeRes {
if noderesourceData.excludeList.IsExcluded(resName) {
continue
}
switch { switch {
case resName == "cpu": case resName == "cpu":
perNuma[nodeID][resName] = &resourceData{ perNuma[nodeID][resName] = &resourceData{

View file

@ -178,7 +178,7 @@ func TestResourcesAggregator(t *testing.T) {
corev1.ResourceName("hugepages-2Mi"): 2048, corev1.ResourceName("hugepages-2Mi"): 2048,
}, },
} }
resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity) resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity, NewExcludeResourceList(map[string][]string{}, ""))
Convey("When aggregating resources", func() { Convey("When aggregating resources", func() {
expected := topologyv1alpha1.ZoneList{ expected := topologyv1alpha1.ZoneList{
@ -376,7 +376,7 @@ func TestResourcesAggregator(t *testing.T) {
}, },
} }
resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity) resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes, memoryResourcesCapacity, NewExcludeResourceList(map[string][]string{}, ""))
Convey("When aggregating resources", func() { Convey("When aggregating resources", func() {
podRes := []PodResources{ podRes := []PodResources{