1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

Merge pull request #1256 from PiotrProkop/fix-topo-updater-policy-and-scope-advertisment

Fix Topology Manager policy and scope not being updated after NRT creation
This commit is contained in:
Kubernetes Prow Robot 2023-07-28 00:25:54 -07:00 committed by GitHub
commit e0f10a81de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 72 deletions

View file

@ -19,19 +19,16 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"net/url"
"os" "os"
"path" "path"
"time" "time"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater" topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor" "sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath" "sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
) )
@ -63,14 +60,8 @@ func main() {
// Plug klog into grpc logging infrastructure // Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog() utils.ConfigureGrpcKlog()
klConfig, err := getKubeletConfig(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
klog.ErrorS(err, "failed to get kubelet configuration")
os.Exit(1)
}
// Get new TopologyUpdater instance // Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope) instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs)
if err != nil { if err != nil {
klog.ErrorS(err, "failed to initialize topology updater instance") klog.ErrorS(err, "failed to initialize topology updater instance")
os.Exit(1) os.Exit(1)
@ -134,34 +125,3 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
return args, resourcemonitorArgs return args, resourcemonitorArgs
} }
func getKubeletConfig(uri, apiAuthTokenFile string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}
// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}
return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}

View file

@ -18,6 +18,7 @@ package nfdtopologyupdater
import ( import (
"fmt" "fmt"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper" "sigs.k8s.io/node-feature-discovery/pkg/apihelper"
@ -34,6 +36,7 @@ import (
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor" "sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy" "sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
) )
@ -66,24 +69,8 @@ type NfdTopologyUpdater interface {
Stop() Stop()
} }
type staticNodeInfo struct {
nodeName string
tmPolicy string
tmScope string
}
func newStaticNodeInfo(policy, scope string) staticNodeInfo {
nodeName := utils.NodeName()
klog.InfoS("detected kubelet Topology Manager configuration", "policy", policy, "scope", scope, "nodeName", nodeName)
return staticNodeInfo{
nodeName: nodeName,
tmPolicy: policy,
tmScope: scope,
}
}
type nfdTopologyUpdater struct { type nfdTopologyUpdater struct {
nodeInfo staticNodeInfo nodeName string
args Args args Args
apihelper apihelper.APIHelpers apihelper apihelper.APIHelpers
resourcemonitorArgs resourcemonitor.Args resourcemonitorArgs resourcemonitor.Args
@ -91,10 +78,11 @@ type nfdTopologyUpdater struct {
eventSource <-chan kubeletnotifier.Info eventSource <-chan kubeletnotifier.Info
configFilePath string configFilePath string
config *NFDConfig config *NFDConfig
kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error)
} }
// NewTopologyUpdater creates a new NfdTopologyUpdater instance. // NewTopologyUpdater creates a new NfdTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) { func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) {
eventSource := make(chan kubeletnotifier.Info) eventSource := make(chan kubeletnotifier.Info)
if args.KubeletStateDir != "" { if args.KubeletStateDir != "" {
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir) ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
@ -103,13 +91,20 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
} }
go ntf.Run() go ntf.Run()
} }
kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
return nil, err
}
nfd := &nfdTopologyUpdater{ nfd := &nfdTopologyUpdater{
args: args, args: args,
resourcemonitorArgs: resourcemonitorArgs, resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: newStaticNodeInfo(policy, scope),
stop: make(chan struct{}, 1), stop: make(chan struct{}, 1),
nodeName: utils.NodeName(),
eventSource: eventSource, eventSource: eventSource,
config: &NFDConfig{}, config: &NFDConfig{},
kubeletConfigFunc: kubeletConfigFunc,
} }
if args.ConfigFile != "" { if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile) nfd.configFilePath = filepath.Clean(args.ConfigFile)
@ -117,10 +112,19 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
return nfd, nil return nfd, nil
} }
func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, error) {
klConfig, err := w.kubeletConfigFunc()
if err != nil {
return "", "", err
}
return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil
}
// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after // Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args. // one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error { func (w *nfdTopologyUpdater) Run() error {
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeInfo.nodeName) klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeName)
podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath) podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil { if err != nil {
@ -151,7 +155,7 @@ func (w *nfdTopologyUpdater) Run() error {
// zonesChannel := make(chan v1alpha1.ZoneList) // zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList var zones v1alpha2.ZoneList
excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeInfo.nodeName) excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList) resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList)
if err != nil { if err != nil {
return fmt.Errorf("failed to obtain node resource information: %w", err) return fmt.Errorf("failed to obtain node resource information: %w", err)
@ -169,8 +173,13 @@ func (w *nfdTopologyUpdater) Run() error {
} }
zones = resAggr.Aggregate(scanResponse.PodResources) zones = resAggr.Aggregate(scanResponse.PodResources)
klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones)) klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones))
readKubeletConfig := false
if info.Event == kubeletnotifier.IntervalBased {
readKubeletConfig = true
}
if !w.args.NoPublish { if !w.args.NoPublish {
if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil { if err = w.updateNodeResourceTopology(zones, scanResponse, readKubeletConfig); err != nil {
return err return err
} }
} }
@ -195,27 +204,29 @@ func (w *nfdTopologyUpdater) Stop() {
} }
} }
func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error { func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse, readKubeletConfig bool) error {
cli, err := w.apihelper.GetTopologyClient() cli, err := w.apihelper.GetTopologyClient()
if err != nil { if err != nil {
return err return err
} }
nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeInfo.nodeName, metav1.GetOptions{}) nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeName, metav1.GetOptions{})
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
nrtNew := v1alpha2.NodeResourceTopology{ nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: w.nodeInfo.nodeName, Name: w.nodeName,
}, },
Zones: zoneInfo, Zones: zoneInfo,
TopologyPolicies: []string{string(topologypolicy.DetectTopologyPolicy(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope))}, Attributes: v1alpha2.AttributeList{},
Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope), }
if err := w.updateNRTTopologyManagerInfo(&nrtNew); err != nil {
return err
} }
updateAttributes(&nrtNew.Attributes, scanResponse.Attributes) updateAttributes(&nrtNew.Attributes, scanResponse.Attributes)
_, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}) if _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}); err != nil {
if err != nil {
return fmt.Errorf("failed to create NodeResourceTopology: %w", err) return fmt.Errorf("failed to create NodeResourceTopology: %w", err)
} }
return nil return nil
@ -225,16 +236,41 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
nrtMutated := nrt.DeepCopy() nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo nrtMutated.Zones = zoneInfo
updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes)
attributes := scanResponse.Attributes
if readKubeletConfig {
if err := w.updateNRTTopologyManagerInfo(nrtMutated); err != nil {
return err
}
}
updateAttributes(&nrtMutated.Attributes, attributes)
nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{}) nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("failed to update NodeResourceTopology: %w", err) return fmt.Errorf("failed to update NodeResourceTopology: %w", err)
} }
klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated)) klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated))
return nil return nil
} }
func (w *nfdTopologyUpdater) updateNRTTopologyManagerInfo(nrt *v1alpha2.NodeResourceTopology) error {
policy, scope, err := w.detectTopologyPolicyAndScope()
if err != nil {
return fmt.Errorf("failed to detect TopologyManager's policy and scope: %w", err)
}
tmAttributes := createTopologyAttributes(policy, scope)
deprecatedTopologyPolicies := []string{string(topologypolicy.DetectTopologyPolicy(policy, scope))}
updateAttributes(&nrt.Attributes, tmAttributes)
nrt.TopologyPolicies = deprecatedTopologyPolicies
return nil
}
func (w *nfdTopologyUpdater) configure() error { func (w *nfdTopologyUpdater) configure() error {
if w.configFilePath == "" { if w.configFilePath == "" {
klog.InfoS("no configuration file specified") klog.InfoS("no configuration file specified")
@ -290,3 +326,38 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) {
updateAttribute(lhs, attr) updateAttribute(lhs, attr)
} }
} }
func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}
// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
}, nil
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}, nil
}
return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}