1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

Fix Topology Manager policy and scope not being updated properly

NFD is only detecting policy and scope of Topology Manager when NRT object doesn't exist.
This means that topologyManagerScope and topologyManagerPolicy attributes won't be updated
even if kubelet config was changed to use other TopologyManager policy and scope.

Signed-off-by: pprokop <pprokop@nvidia.com>
This commit is contained in:
pprokop 2023-07-04 10:46:52 +02:00 committed by PiotrProkop
parent fd0ba3f9d9
commit 6d98b6150b
2 changed files with 103 additions and 72 deletions

View file

@ -19,19 +19,16 @@ package main
import (
"flag"
"fmt"
"net/url"
"os"
"path"
"time"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
@ -63,14 +60,8 @@ func main() {
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()
klConfig, err := getKubeletConfig(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
klog.ErrorS(err, "failed to get kubelet configuration")
os.Exit(1)
}
// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs)
if err != nil {
klog.ErrorS(err, "failed to initialize topology updater instance")
os.Exit(1)
@ -134,34 +125,3 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
return args, resourcemonitorArgs
}
func getKubeletConfig(uri, apiAuthTokenFile string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}
// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}
return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}

View file

@ -18,6 +18,7 @@ package nfdtopologyupdater
import (
"fmt"
"net/url"
"os"
"path/filepath"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
@ -34,6 +36,7 @@ import (
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/yaml"
)
@ -66,24 +69,8 @@ type NfdTopologyUpdater interface {
Stop()
}
type staticNodeInfo struct {
nodeName string
tmPolicy string
tmScope string
}
func newStaticNodeInfo(policy, scope string) staticNodeInfo {
nodeName := utils.NodeName()
klog.InfoS("detected kubelet Topology Manager configuration", "policy", policy, "scope", scope, "nodeName", nodeName)
return staticNodeInfo{
nodeName: nodeName,
tmPolicy: policy,
tmScope: scope,
}
}
type nfdTopologyUpdater struct {
nodeInfo staticNodeInfo
nodeName string
args Args
apihelper apihelper.APIHelpers
resourcemonitorArgs resourcemonitor.Args
@ -91,10 +78,11 @@ type nfdTopologyUpdater struct {
eventSource <-chan kubeletnotifier.Info
configFilePath string
config *NFDConfig
kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error)
}
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) {
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) {
eventSource := make(chan kubeletnotifier.Info)
if args.KubeletStateDir != "" {
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
@ -103,13 +91,20 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
}
go ntf.Run()
}
kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
return nil, err
}
nfd := &nfdTopologyUpdater{
args: args,
resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: newStaticNodeInfo(policy, scope),
stop: make(chan struct{}, 1),
nodeName: utils.NodeName(),
eventSource: eventSource,
config: &NFDConfig{},
kubeletConfigFunc: kubeletConfigFunc,
}
if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
@ -117,10 +112,19 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
return nfd, nil
}
func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, error) {
klConfig, err := w.kubeletConfigFunc()
if err != nil {
return "", "", err
}
return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil
}
// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error {
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeInfo.nodeName)
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeName)
podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil {
@ -151,7 +155,7 @@ func (w *nfdTopologyUpdater) Run() error {
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList
excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeInfo.nodeName)
excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList)
if err != nil {
return fmt.Errorf("failed to obtain node resource information: %w", err)
@ -169,8 +173,13 @@ func (w *nfdTopologyUpdater) Run() error {
}
zones = resAggr.Aggregate(scanResponse.PodResources)
klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones))
readKubeletConfig := false
if info.Event == kubeletnotifier.IntervalBased {
readKubeletConfig = true
}
if !w.args.NoPublish {
if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil {
if err = w.updateNodeResourceTopology(zones, scanResponse, readKubeletConfig); err != nil {
return err
}
}
@ -195,27 +204,29 @@ func (w *nfdTopologyUpdater) Stop() {
}
}
func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error {
func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse, readKubeletConfig bool) error {
cli, err := w.apihelper.GetTopologyClient()
if err != nil {
return err
}
nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeInfo.nodeName, metav1.GetOptions{})
nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeName, metav1.GetOptions{})
if errors.IsNotFound(err) {
nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: w.nodeInfo.nodeName,
Name: w.nodeName,
},
Zones: zoneInfo,
TopologyPolicies: []string{string(topologypolicy.DetectTopologyPolicy(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope))},
Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope),
Zones: zoneInfo,
Attributes: v1alpha2.AttributeList{},
}
if err := w.updateNRTTopologyManagerInfo(&nrtNew); err != nil {
return err
}
updateAttributes(&nrtNew.Attributes, scanResponse.Attributes)
_, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{})
if err != nil {
if _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create NodeResourceTopology: %w", err)
}
return nil
@ -225,16 +236,41 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo
updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes)
attributes := scanResponse.Attributes
if readKubeletConfig {
if err := w.updateNRTTopologyManagerInfo(nrtMutated); err != nil {
return err
}
}
updateAttributes(&nrtMutated.Attributes, attributes)
nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update NodeResourceTopology: %w", err)
}
klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated))
return nil
}
func (w *nfdTopologyUpdater) updateNRTTopologyManagerInfo(nrt *v1alpha2.NodeResourceTopology) error {
policy, scope, err := w.detectTopologyPolicyAndScope()
if err != nil {
return fmt.Errorf("failed to detect TopologyManager's policy and scope: %w", err)
}
tmAttributes := createTopologyAttributes(policy, scope)
deprecatedTopologyPolicies := []string{string(topologypolicy.DetectTopologyPolicy(policy, scope))}
updateAttributes(&nrt.Attributes, tmAttributes)
nrt.TopologyPolicies = deprecatedTopologyPolicies
return nil
}
func (w *nfdTopologyUpdater) configure() error {
if w.configFilePath == "" {
klog.InfoS("no configuration file specified")
@ -290,3 +326,38 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) {
updateAttribute(lhs, attr)
}
}
func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}
// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
}, nil
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}, nil
}
return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}