mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2025-03-14 20:56:42 +00:00
Merge pull request #1031 from k8stopologyawareschedwg/reactive_updates
topology-updater: reactive updates
This commit is contained in:
commit
13f92faa77
11 changed files with 308 additions and 56 deletions
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -40,6 +41,8 @@ const (
|
|||
kubeletSecurePort = 10250
|
||||
)
|
||||
|
||||
var DefaultKubeletStateDir = path.Join(string(hostpath.VarDir), "lib", "kubelet")
|
||||
|
||||
func main() {
|
||||
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)
|
||||
|
||||
|
@ -88,7 +91,10 @@ func main() {
|
|||
}
|
||||
|
||||
// Get new TopologyUpdater instance
|
||||
instance := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
|
||||
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
|
||||
if err != nil {
|
||||
klog.Exit(err)
|
||||
}
|
||||
|
||||
if err = instance.Run(); err != nil {
|
||||
klog.Exit(err)
|
||||
|
@ -128,7 +134,7 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
|
|||
flagset.StringVar(&args.KubeConfigFile, "kubeconfig", "",
|
||||
"Kube config file.")
|
||||
flagset.DurationVar(&resourcemonitorArgs.SleepInterval, "sleep-interval", time.Duration(60)*time.Second,
|
||||
"Time to sleep between CR updates. Non-positive value implies no CR updatation (i.e. infinite sleep). [Default: 60s]")
|
||||
"Time to sleep between CR updates. zero means no CR updates on interval basis. [Default: 60s]")
|
||||
flagset.StringVar(&resourcemonitorArgs.Namespace, "watch-namespace", "*",
|
||||
"Namespace to watch pods (for testing/debugging purpose). Use * for all namespaces.")
|
||||
flagset.StringVar(&resourcemonitorArgs.KubeletConfigURI, "kubelet-config-uri", "",
|
||||
|
@ -140,6 +146,7 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
|
|||
flagset.StringVar(&args.ConfigFile, "config", "/etc/kubernetes/node-feature-discovery/nfd-topology-updater.conf",
|
||||
"Config file to use.")
|
||||
flagset.BoolVar(&resourcemonitorArgs.PodSetFingerprint, "pods-fingerprint", false, "Compute and report the pod set fingerprint")
|
||||
flagset.StringVar(&args.KubeletStateDir, "kubelet-state-dir", DefaultKubeletStateDir, "Kubelet state directory path for watching state and checkpoint files")
|
||||
|
||||
klog.InitFlags(flagset)
|
||||
|
||||
|
|
|
@ -10,6 +10,9 @@
|
|||
- name: nfd-topology-updater-conf
|
||||
configMap:
|
||||
name: nfd-topology-updater-conf
|
||||
- name: kubelet-state-files
|
||||
hostPath:
|
||||
path: /var/lib/kubelet
|
||||
|
||||
- op: add
|
||||
path: /spec/template/spec/containers/0/volumeMounts
|
||||
|
@ -21,6 +24,9 @@
|
|||
- name: nfd-topology-updater-conf
|
||||
mountPath: "/etc/kubernetes/node-feature-discovery"
|
||||
readOnly: true
|
||||
- name: kubelet-state-files
|
||||
mountPath: /host-var/lib/kubelet
|
||||
readOnly: true
|
||||
|
||||
- op: add
|
||||
path: /spec/template/spec/containers/0/args/-
|
||||
|
|
|
@ -70,6 +70,11 @@ spec:
|
|||
mountPath: /host-var/lib/kubelet/pod-resources/kubelet.sock
|
||||
- name: host-sys
|
||||
mountPath: /host-sys
|
||||
{{- if .Values.topologyUpdater.kubeletStateDir | empty | not }}
|
||||
- name: kubelet-state-files
|
||||
mountPath: /host-var/lib/kubelet
|
||||
readOnly: true
|
||||
{{- end }}
|
||||
{{- if .Values.tls.enable }}
|
||||
- name: nfd-topology-updater-cert
|
||||
mountPath: "/etc/kubernetes/node-feature-discovery/certs"
|
||||
|
@ -99,6 +104,11 @@ spec:
|
|||
{{- else }}
|
||||
path: /var/lib/kubelet/pod-resources/kubelet.sock
|
||||
{{- end }}
|
||||
{{- if .Values.topologyUpdater.kubeletStateDir | empty | not }}
|
||||
- name: kubelet-state-files
|
||||
hostPath:
|
||||
path: {{ .Values.topologyUpdater.kubeletStateDir }}
|
||||
{{- end }}
|
||||
- name: nfd-topology-updater-conf
|
||||
configMap:
|
||||
name: {{ include "node-feature-discovery.fullname" . }}-topology-updater-conf
|
||||
|
|
|
@ -399,6 +399,7 @@ topologyUpdater:
|
|||
kubeletPodResourcesSockPath:
|
||||
updateInterval: 60s
|
||||
watchNamespace: "*"
|
||||
kubeletStateDir: /host-var/lib/kubelet
|
||||
|
||||
podSecurityContext: {}
|
||||
securityContext:
|
||||
|
|
|
@ -152,28 +152,29 @@ We have introduced the following Chart parameters.
|
|||
|
||||
### Topology updater parameters
|
||||
|
||||
| Name | Type | Default | description |
|
||||
|-----------------------------------------------|--------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `topologyUpdater.*` | dict | | NFD Topology Updater configuration |
|
||||
| `topologyUpdater.enable` | bool | false | Specifies whether the NFD Topology Updater should be created |
|
||||
| `topologyUpdater.createCRDs` | bool | false | Specifies whether the NFD Topology Updater CRDs should be created |
|
||||
| `topologyUpdater.serviceAccount.create` | bool | true | Specifies whether the service account for topology updater should be created |
|
||||
| `topologyUpdater.serviceAccount.annotations` | dict | {} | Annotations to add to the service account for topology updater |
|
||||
| `topologyUpdater.serviceAccount.name` | string | | The name of the service account for topology updater to use. If not set and create is true, a name is generated using the fullname template and `-topology-updater` suffix |
|
||||
| `topologyUpdater.rbac.create` | bool | false | Specifies whether to create [RBAC][rbac] configuration for topology updater |
|
||||
| `topologyUpdater.kubeletConfigPath` | string | "" | Specifies the kubelet config host path |
|
||||
| `topologyUpdater.kubeletPodResourcesSockPath` | string | "" | Specifies the kubelet sock path to read pod resources |
|
||||
| `topologyUpdater.updateInterval` | string | 60s | Time to sleep between CR updates. Non-positive value implies no CR update. |
|
||||
| `topologyUpdater.watchNamespace` | string | `*` | Namespace to watch pods, `*` for all namespaces |
|
||||
| `topologyUpdater.podSecurityContext` | dict | {} | [PodSecurityContext](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod) holds pod-level security attributes and common container settings |
|
||||
| `topologyUpdater.securityContext` | dict | {} | Container [security settings](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container) |
|
||||
| `topologyUpdater.resources` | dict | {} | Topology updater pod [resources management](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/) |
|
||||
| `topologyUpdater.nodeSelector` | dict | {} | Topology updater pod [node selector](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector) |
|
||||
| `topologyUpdater.tolerations` | dict | {} | Topology updater pod [node tolerations](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/) |
|
||||
| `topologyUpdater.annotations` | dict | {} | Topology updater pod [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
|
||||
| `topologyUpdater.affinity` | dict | {} | Topology updater pod [affinity](https://kubernetes.io/docs/tasks/configure-pod-container/assign-pods-nodes-using-node-affinity/) |
|
||||
| `topologyUpdater.config` | dict | | [configuration](../reference/topology-updater-configuration-reference) |
|
||||
| `topologyUpdater.podSetFingerprint` | bool | false | Enables compute and report of pod fingerprint in NRT objects. |
|
||||
| Name | Type | Default | description |
|
||||
|-----------------------------------------------|--------|-------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `topologyUpdater.*` | dict | | NFD Topology Updater configuration |
|
||||
| `topologyUpdater.enable` | bool | false | Specifies whether the NFD Topology Updater should be created |
|
||||
| `topologyUpdater.createCRDs` | bool | false | Specifies whether the NFD Topology Updater CRDs should be created |
|
||||
| `topologyUpdater.serviceAccount.create` | bool | true | Specifies whether the service account for topology updater should be created |
|
||||
| `topologyUpdater.serviceAccount.annotations` | dict | {} | Annotations to add to the service account for topology updater |
|
||||
| `topologyUpdater.serviceAccount.name` | string | | The name of the service account for topology updater to use. If not set and create is true, a name is generated using the fullname template and `-topology-updater` suffix |
|
||||
| `topologyUpdater.rbac.create` | bool | false | Specifies whether to create [RBAC][rbac] configuration for topology updater |
|
||||
| `topologyUpdater.kubeletConfigPath` | string | "" | Specifies the kubelet config host path |
|
||||
| `topologyUpdater.kubeletPodResourcesSockPath` | string | "" | Specifies the kubelet sock path to read pod resources |
|
||||
| `topologyUpdater.updateInterval` | string | 60s | Time to sleep between CR updates. Non-positive value implies no CR update. |
|
||||
| `topologyUpdater.watchNamespace` | string | `*` | Namespace to watch pods, `*` for all namespaces |
|
||||
| `topologyUpdater.podSecurityContext` | dict | {} | [PodSecurityContext](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod) holds pod-level security attributes and common container settings |
|
||||
| `topologyUpdater.securityContext` | dict | {} | Container [security settings](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container) |
|
||||
| `topologyUpdater.resources` | dict | {} | Topology updater pod [resources management](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/) |
|
||||
| `topologyUpdater.nodeSelector` | dict | {} | Topology updater pod [node selector](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector) |
|
||||
| `topologyUpdater.tolerations` | dict | {} | Topology updater pod [node tolerations](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/) |
|
||||
| `topologyUpdater.annotations` | dict | {} | Topology updater pod [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
|
||||
| `topologyUpdater.affinity` | dict | {} | Topology updater pod [affinity](https://kubernetes.io/docs/tasks/configure-pod-container/assign-pods-nodes-using-node-affinity/) |
|
||||
| `topologyUpdater.config` | dict | | [configuration](../reference/topology-updater-configuration-reference) |
|
||||
| `topologyUpdater.podSetFingerprint` | bool | false | Enables compute and report of pod fingerprint in NRT objects. |
|
||||
| `topologyUpdater.kubeletStateDir` | string | "/host-var/lib/kubelet" | Specifies kubelet state directory path for watching state and checkpoint files. Empty value disables kubelet state tracking. |
|
||||
|
||||
### Topology garbage collector parameters
|
||||
|
||||
|
|
|
@ -77,8 +77,7 @@ nfd-topology-updater -oneshot -no-publish
|
|||
### -sleep-interval
|
||||
|
||||
The `-sleep-interval` specifies the interval between resource hardware
|
||||
topology re-examination (and CR updates). A non-positive value implies
|
||||
infinite sleep interval, i.e. no re-detection is done.
|
||||
topology re-examination (and CR updates). zero means no CR updates on interval basis.
|
||||
|
||||
Default: 60s
|
||||
|
||||
|
@ -150,7 +149,7 @@ nfd-topology-updater -podresources-socket=/var/lib/kubelet/pod-resources/kubelet
|
|||
|
||||
### -pods-fingerprint
|
||||
|
||||
Enbles the compute and report the pod set fingerprint in the NRT.
|
||||
Enables compute and report the pod set fingerprint in the NRT.
|
||||
A pod fingerprint is a compact representation of the "node state" regarding resources.
|
||||
|
||||
Default: `false`
|
||||
|
@ -160,3 +159,19 @@ Example:
|
|||
```bash
|
||||
nfd-topology-updater -pods-fingerprint
|
||||
```
|
||||
|
||||
### -kubelet-state-dir
|
||||
|
||||
The `-kubelet-state-dir` specifies the path to the Kubelet state directory,
|
||||
where state and checkpoint files are stored.
|
||||
The files are mount as read-only and cannot be change by the updater.
|
||||
Enabled by default.
|
||||
Passing an empty string will disable the watching.
|
||||
|
||||
Default: /host-var/lib/kubelet
|
||||
|
||||
Example:
|
||||
|
||||
```bash
|
||||
nfd-topology-updater -kubelet-state-dir=/var/lib/kubelet
|
||||
```
|
||||
|
|
|
@ -9,19 +9,28 @@ sort: 5
|
|||
|
||||
---
|
||||
|
||||
NFD-Topology-Updater is preferably run as a Kubernetes DaemonSet. This assures
|
||||
re-examination on regular intervals, capturing changes in the allocated
|
||||
resources and hence the allocatable resources on a per zone basis by updating
|
||||
NFD-Topology-Updater is preferably run as a Kubernetes DaemonSet.
|
||||
This assures re-examination on regular intervals
|
||||
and/or per pod life-cycle events, capturing changes in the allocated
|
||||
resources and hence the allocatable resources on a per-zone basis by updating
|
||||
[NodeResourceTopology](custom-resources.md#noderesourcetopology) custom resources.
|
||||
It makes sure that new NodeResourceTopology instances are created for each new
|
||||
nodes that get added to the cluster.
|
||||
|
||||
When run as a daemonset, nodes are re-examined for the allocated resources
|
||||
(to determine the information of the allocatable resources on a per zone basis
|
||||
(to determine the information of the allocatable resources on a per-zone basis
|
||||
where a zone can be a NUMA node) at an interval specified using the
|
||||
[`-sleep-interval`](../reference/topology-updater-commandline-reference.html.md#-sleep-interval)
|
||||
option. The default sleep interval is set to 60s which is the value when no
|
||||
-sleep-interval is specified.
|
||||
option. The default sleep interval is set to 60s
|
||||
which is the value when no -sleep-interval is specified.
|
||||
The re-examination can be disabled by setting the sleep-interval to 0.
|
||||
|
||||
Another option is to configure the updater to update
|
||||
the allocated resources per pod life-cycle events.
|
||||
The updater will monitor the checkpoint file stated in
|
||||
[`-kubelet-state-dir`](../reference/topology-updater-commandline-reference.md#-kubelet-state-dir)
|
||||
and triggers an update for every change occurs in the files.
|
||||
|
||||
In addition, it can avoid examining specific allocated resources
|
||||
given a configuration of resources to exclude via [`-excludeList`](../reference/topology-updater-configuration-reference.md#excludelist)
|
||||
|
||||
|
|
100
pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go
Normal file
100
pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go
Normal file
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubeletnotifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
IntervalBased EventType = "intervalBased"
|
||||
FSUpdate EventType = "fsUpdate"
|
||||
)
|
||||
|
||||
var stateFiles = sets.NewString(
|
||||
"cpu_manager_state",
|
||||
"memory_manager_state",
|
||||
"kubelet_internal_checkpoint",
|
||||
)
|
||||
|
||||
type Notifier struct {
|
||||
sleepInterval time.Duration
|
||||
// destination where notifications are sent
|
||||
dest chan<- Info
|
||||
fsEvent <-chan fsnotify.Event
|
||||
}
|
||||
|
||||
type Info struct {
|
||||
Event EventType
|
||||
}
|
||||
|
||||
func New(sleepInterval time.Duration, dest chan<- Info, kubeletStateDir string) (*Notifier, error) {
|
||||
ch, err := createFSWatcherEvent([]string{kubeletStateDir})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Notifier{
|
||||
sleepInterval: sleepInterval,
|
||||
dest: dest,
|
||||
fsEvent: ch,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *Notifier) Run() {
|
||||
timeEvents := make(<-chan time.Time)
|
||||
if n.sleepInterval > 0 {
|
||||
ticker := time.NewTicker(n.sleepInterval)
|
||||
timeEvents = ticker.C
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeEvents:
|
||||
klog.V(5).Infof("timer update received")
|
||||
i := Info{Event: IntervalBased}
|
||||
n.dest <- i
|
||||
|
||||
case e := <-n.fsEvent:
|
||||
klog.V(5).Infof("fsnotify event from file %q: %q received", e.Name, e.Op)
|
||||
if stateFiles.Has(e.Name) {
|
||||
i := Info{Event: FSUpdate}
|
||||
n.dest <- i
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createFSWatcherEvent(fsWatchPaths []string) (chan fsnotify.Event, error) {
|
||||
fsWatcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, path := range fsWatchPaths {
|
||||
if err = fsWatcher.Add(path); err != nil {
|
||||
return nil, fmt.Errorf("failed to watch: %q; %w", path, err)
|
||||
}
|
||||
}
|
||||
return fsWatcher.Events, nil
|
||||
}
|
|
@ -20,16 +20,16 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
|
||||
"golang.org/x/net/context"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater/kubeletnotifier"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/podres"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
|
||||
|
@ -47,10 +47,11 @@ const (
|
|||
|
||||
// Args are the command line arguments
|
||||
type Args struct {
|
||||
NoPublish bool
|
||||
Oneshot bool
|
||||
KubeConfigFile string
|
||||
ConfigFile string
|
||||
NoPublish bool
|
||||
Oneshot bool
|
||||
KubeConfigFile string
|
||||
ConfigFile string
|
||||
KubeletStateDir string
|
||||
|
||||
Klog map[string]*utils.KlogFlagVal
|
||||
}
|
||||
|
@ -87,23 +88,33 @@ type nfdTopologyUpdater struct {
|
|||
apihelper apihelper.APIHelpers
|
||||
resourcemonitorArgs resourcemonitor.Args
|
||||
stop chan struct{} // channel for signaling stop
|
||||
eventSource <-chan kubeletnotifier.Info
|
||||
configFilePath string
|
||||
config *NFDConfig
|
||||
}
|
||||
|
||||
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
|
||||
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) NfdTopologyUpdater {
|
||||
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) {
|
||||
eventSource := make(chan kubeletnotifier.Info)
|
||||
if args.KubeletStateDir != "" {
|
||||
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go ntf.Run()
|
||||
}
|
||||
nfd := &nfdTopologyUpdater{
|
||||
args: args,
|
||||
resourcemonitorArgs: resourcemonitorArgs,
|
||||
nodeInfo: newStaticNodeInfo(policy, scope),
|
||||
stop: make(chan struct{}, 1),
|
||||
eventSource: eventSource,
|
||||
config: &NFDConfig{},
|
||||
}
|
||||
if args.ConfigFile != "" {
|
||||
nfd.configFilePath = filepath.Clean(args.ConfigFile)
|
||||
}
|
||||
return nfd
|
||||
return nfd, nil
|
||||
}
|
||||
|
||||
// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
|
||||
|
@ -149,19 +160,18 @@ func (w *nfdTopologyUpdater) Run() error {
|
|||
|
||||
klog.V(2).Infof("resAggr is: %v\n", resAggr)
|
||||
|
||||
crTrigger := time.NewTicker(w.resourcemonitorArgs.SleepInterval)
|
||||
for {
|
||||
select {
|
||||
case <-crTrigger.C:
|
||||
klog.Infof("Scanning")
|
||||
case info := <-w.eventSource:
|
||||
klog.V(4).Infof("got %q event. scanning...", info.Event)
|
||||
scanResponse, err := resScan.Scan()
|
||||
utils.KlogDump(1, "podResources are", " ", scanResponse.PodResources)
|
||||
if err != nil {
|
||||
klog.Warningf("Scan failed: %v", err)
|
||||
klog.Warningf("scan failed: %v", err)
|
||||
continue
|
||||
}
|
||||
zones = resAggr.Aggregate(scanResponse.PodResources)
|
||||
utils.KlogDump(1, "After aggregating resources identified zones are", " ", zones)
|
||||
utils.KlogDump(1, "after aggregating resources identified zones are", " ", zones)
|
||||
if !w.args.NoPublish {
|
||||
if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil {
|
||||
return err
|
||||
|
|
|
@ -21,8 +21,6 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
@ -34,6 +32,7 @@ import (
|
|||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
|
@ -115,8 +114,7 @@ var _ = SIGDescribe("NFD topology updater", func() {
|
|||
|
||||
kcfg := cfg.GetKubeletConfig()
|
||||
By(fmt.Sprintf("Using config (%#v)", kcfg))
|
||||
|
||||
podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage())}
|
||||
podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-sleep-interval=3s")}
|
||||
topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...)
|
||||
})
|
||||
|
||||
|
@ -184,7 +182,7 @@ var _ = SIGDescribe("NFD topology updater", func() {
|
|||
|
||||
cooldown := 30 * time.Second
|
||||
By(fmt.Sprintf("getting the updated topology - sleeping for %v", cooldown))
|
||||
// the object, hance the resource version must NOT change, so we can only sleep
|
||||
// the object, hence the resource version must NOT change, so we can only sleep
|
||||
time.Sleep(cooldown)
|
||||
By("checking the changes in the updated topology - expecting none")
|
||||
finalNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
|
||||
|
@ -263,7 +261,90 @@ var _ = SIGDescribe("NFD topology updater", func() {
|
|||
return true
|
||||
}, time.Minute, 5*time.Second).Should(BeTrue(), "didn't get updated node topology info")
|
||||
})
|
||||
})
|
||||
|
||||
When("sleep interval disabled", func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
cfg, err := testutils.GetConfig()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
kcfg := cfg.GetKubeletConfig()
|
||||
By(fmt.Sprintf("Using config (%#v)", kcfg))
|
||||
podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-sleep-interval=0s")}
|
||||
topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...)
|
||||
})
|
||||
It("should still create CRs using a reactive updates", func() {
|
||||
nodes, err := testutils.FilterNodesWithEnoughCores(workerNodes, "1000m")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if len(nodes) < 1 {
|
||||
Skip("not enough allocatable cores for this test")
|
||||
}
|
||||
|
||||
By("creating a pod consuming exclusive CPUs")
|
||||
sleeperPod := testpod.GuaranteedSleeper(testpod.WithLimits(
|
||||
corev1.ResourceList{
|
||||
corev1.ResourceCPU: resource.MustParse("1000m"),
|
||||
// any random reasonable amount is fine
|
||||
corev1.ResourceMemory: resource.MustParse("100Mi"),
|
||||
}))
|
||||
// in case there is more than a single node in the cluster
|
||||
// we need to set the node name, so we'll have certainty about
|
||||
// which node we need to examine
|
||||
sleeperPod.Spec.NodeName = topologyUpdaterNode.Name
|
||||
|
||||
podMap := make(map[string]*corev1.Pod)
|
||||
pod := e2epod.NewPodClient(f).CreateSync(sleeperPod)
|
||||
podMap[pod.Name] = pod
|
||||
defer testpod.DeleteAsync(f, podMap)
|
||||
|
||||
By("checking initial CR created")
|
||||
initialNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
|
||||
|
||||
By("creating additional pod consuming exclusive CPUs")
|
||||
sleeperPod2 := testpod.GuaranteedSleeper(testpod.WithLimits(
|
||||
corev1.ResourceList{
|
||||
corev1.ResourceCPU: resource.MustParse("1000m"),
|
||||
// any random reasonable amount is fine
|
||||
corev1.ResourceMemory: resource.MustParse("100Mi"),
|
||||
}))
|
||||
|
||||
// in case there is more than a single node in the cluster
|
||||
// we need to set the node name, so we'll have certainty about
|
||||
// which node we need to examine
|
||||
sleeperPod2.Spec.NodeName = topologyUpdaterNode.Name
|
||||
sleeperPod2.Name = sleeperPod2.Name + "2"
|
||||
pod2 := e2epod.NewPodClient(f).CreateSync(sleeperPod2)
|
||||
podMap[pod.Name] = pod2
|
||||
|
||||
By("checking the changes in the updated topology")
|
||||
var finalNodeTopo *v1alpha2.NodeResourceTopology
|
||||
Eventually(func() bool {
|
||||
finalNodeTopo, err = topologyClient.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), topologyUpdaterNode.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
framework.Logf("failed to get the node topology resource: %v", err)
|
||||
return false
|
||||
}
|
||||
if finalNodeTopo.ObjectMeta.ResourceVersion == initialNodeTopo.ObjectMeta.ResourceVersion {
|
||||
framework.Logf("node topology resource %s was not updated", topologyUpdaterNode.Name)
|
||||
}
|
||||
|
||||
initialAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(initialNodeTopo)
|
||||
finalAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(finalNodeTopo)
|
||||
if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 {
|
||||
Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes))
|
||||
}
|
||||
|
||||
zoneName, resName, isLess := lessAllocatableResources(initialAllocRes, finalAllocRes)
|
||||
framework.Logf("zone=%q resource=%q isLess=%v", zoneName, resName, isLess)
|
||||
if !isLess {
|
||||
framework.Logf("final allocatable resources not decreased - initial=%v final=%v", initialAllocRes, finalAllocRes)
|
||||
}
|
||||
return true
|
||||
// timeout must be lower than sleep interval
|
||||
// otherwise we won't be able to determine what
|
||||
// triggered the CR update
|
||||
}, time.Second*20, 5*time.Second).Should(BeTrue(), "didn't get updated node topology info")
|
||||
})
|
||||
})
|
||||
|
||||
When("topology-updater configure to exclude memory", func() {
|
||||
|
|
|
@ -364,8 +364,7 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1.
|
|||
Command: []string{"nfd-topology-updater"},
|
||||
Args: []string{
|
||||
"-kubelet-config-uri=file:///podresources/config.yaml",
|
||||
"-podresources-socket=unix:///podresources/kubelet.sock",
|
||||
"-sleep-interval=3s",
|
||||
"-podresources-socket=unix:///host-var/lib/kubelet/pod-resources/kubelet.sock",
|
||||
"-watch-namespace=rte"},
|
||||
Env: []corev1.EnvVar{
|
||||
{
|
||||
|
@ -389,13 +388,17 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1.
|
|||
},
|
||||
},
|
||||
VolumeMounts: []corev1.VolumeMount{
|
||||
{
|
||||
Name: "kubelet-state-files",
|
||||
MountPath: "/host-var/lib/kubelet",
|
||||
},
|
||||
{
|
||||
Name: "kubelet-podresources-conf",
|
||||
MountPath: "/podresources/config.yaml",
|
||||
},
|
||||
{
|
||||
Name: "kubelet-podresources-sock",
|
||||
MountPath: "/podresources/kubelet.sock",
|
||||
MountPath: "/host-var/lib/kubelet/pod-resources/kubelet.sock",
|
||||
},
|
||||
{
|
||||
Name: "host-sys",
|
||||
|
@ -407,6 +410,15 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1.
|
|||
ServiceAccountName: "nfd-topology-updater-e2e",
|
||||
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
|
||||
Volumes: []corev1.Volume{
|
||||
{
|
||||
Name: "kubelet-state-files",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
HostPath: &corev1.HostPathVolumeSource{
|
||||
Path: "/var/lib/kubelet",
|
||||
Type: newHostPathType(corev1.HostPathDirectory),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "kubelet-podresources-conf",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
|
|
Loading…
Add table
Reference in a new issue