mirror of
synced 2024-12-14 11:57:51 +00:00
topologyupdater: watch/consider only guaranteed pods for accounting
- Files obtained after running make mock - Run `go get github.com/vektra/mockery` and make sure that mockery is in your $PATH - run `make mock` Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
9 changed files with 117 additions and 17 deletions
@ -103,6 +103,8 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
"Update once and exit")
flagset.BoolVar(&args.NoPublish, "no-publish", false,
"Do not publish discovered features to the cluster-local Kubernetes API server.")
flagset.StringVar(&args.KubeConfigFile, "kubeconfig", "",
"Kube config file.")
flagset.DurationVar(&resourcemonitorArgs.SleepInterval, "sleep-interval", time.Duration(60)*time.Second,
"Time to sleep between CR updates. Non-positive value implies no CR updatation (i.e. infinite sleep). [Default: 60s]")
flagset.StringVar(&resourcemonitorArgs.Namespace, "watch-namespace", "*",
@ -44,4 +44,7 @@ type APIHelpers interface {
// GetTopologyClient returns a topologyclientset
GetTopologyClient() (*topologyclientset.Clientset, error)
// GetPod returns the Kubernetes pod in a namepace with a name.
GetPod(*k8sclient.Clientset, string, string) (*api.Pod, error)
@ -122,3 +122,13 @@ func (h K8sHelpers) PatchNodeStatus(c *k8sclient.Clientset, nodeName string, pat
return nil
func (h K8sHelpers) GetPod(cli *k8sclient.Clientset, namespace string, podName string) (*api.Pod, error) {
// Get the node object using pod name
pod, err := cli.CoreV1().Pods(namespace).Get(context.TODO(), podName, meta_v1.GetOptions{})
if err != nil {
return nil, err
return pod, nil
@ -1,4 +1,4 @@
// Code generated by mockery v2.4.0-beta. DO NOT EDIT.
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Re-generate by running 'make mock'
@ -87,6 +87,29 @@ func (_m *MockAPIHelpers) GetNodes(_a0 *kubernetes.Clientset) (*v1.NodeList, err
return r0, r1
// GetPod provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockAPIHelpers) GetPod(_a0 *kubernetes.Clientset, _a1 string, _a2 string) (*v1.Pod, error) {
ret := _m.Called(_a0, _a1, _a2)
var r0 *v1.Pod
if rf, ok := ret.Get(0).(func(*kubernetes.Clientset, string, string) *v1.Pod); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.Pod)
var r1 error
if rf, ok := ret.Get(1).(func(*kubernetes.Clientset, string, string) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
return r0, r1
// GetTopologyClient provides a mock function with given fields:
func (_m *MockAPIHelpers) GetTopologyClient() (*versioned.Clientset, error) {
ret := _m.Called()
@ -1,4 +1,4 @@
// Code generated by mockery v2.4.0-beta. DO NOT EDIT.
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Re-generate by running 'make mock'
@ -25,6 +25,7 @@ import (
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
@ -36,8 +37,9 @@ import (
// Command line arguments
type Args struct {
NoPublish bool
Oneshot bool
NoPublish bool
Oneshot bool
KubeConfigFile string
type NfdTopologyUpdater interface {
@ -86,13 +88,17 @@ func (w *nfdTopologyUpdater) Run() error {
podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil {
klog.Fatalf("Failed to get PodResource Client: %v", err)
klog.Fatalf("failed to get PodResource Client: %w", err)
return err
kubeApihelper := apihelper.K8sHelpers{Kubeconfig: w.args.KubeConfigFile}
var resScan resourcemonitor.ResourcesScanner
resScan, err = resourcemonitor.NewPodResourcesScanner(w.resourcemonitorArgs.Namespace, podResClient)
resScan, err = resourcemonitor.NewPodResourcesScanner(w.resourcemonitorArgs.Namespace, podResClient, kubeApihelper)
if err != nil {
klog.Fatalf("Failed to initialize ResourceMonitor instance: %v", err)
klog.Fatalf("failed to initialize ResourceMonitor instance: %w", err)
return err
// CAUTION: these resources are expected to change rarely - if ever.
@ -103,7 +109,7 @@ func (w *nfdTopologyUpdater) Run() error {
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient)
if err != nil {
klog.Fatalf("Failed to obtain node resource information: %v", err)
klog.Fatalf("failed to obtain node resource information: %w", err)
return err
@ -23,17 +23,22 @@ import (
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
type PodResourcesScanner struct {
namespace string
podResourceClient podresourcesapi.PodResourcesListerClient
apihelper apihelper.APIHelpers
func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.PodResourcesListerClient) (ResourcesScanner, error) {
func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.PodResourcesListerClient, kubeApihelper apihelper.APIHelpers) (ResourcesScanner, error) {
resourcemonitorInstance := &PodResourcesScanner{
namespace: namespace,
podResourceClient: podResourceClient,
apihelper: kubeApihelper,
if resourcemonitorInstance.namespace != "*" {
klog.Infof("watching namespace %q", resourcemonitorInstance.namespace)
@ -45,12 +50,56 @@ func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.
// isWatchable tells if the the given namespace should be watched.
func (resMon *PodResourcesScanner) isWatchable(podNamespace string) bool {
if resMon.namespace == "*" {
return true
func (resMon *PodResourcesScanner) isWatchable(podNamespace string, podName string) (bool, error) {
cli, err := resMon.apihelper.GetClient()
if err != nil {
return false, err
pod, err := resMon.apihelper.GetPod(cli, podNamespace, podName)
if err != nil {
return false, err
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return false, nil
if resMon.namespace == "*" && hasExclusiveCPUs(pod) {
return true, nil
// TODO: add an explicit check for guaranteed pods
return resMon.namespace == podNamespace
return resMon.namespace == podNamespace && hasExclusiveCPUs(pod), nil
// hasExclusiveCPUs returns true if a guranteed pod is allocated exclusive CPUs else returns false.
// In isWatchable() function we check for the pod QoS and proceed if it is guaranteed (i.e. request == limit)
// and hence we only check for request in the function below.
func hasExclusiveCPUs(pod *v1.Pod) bool {
for _, container := range pod.Spec.InitContainers {
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
isInitContainerGuaranteed := hasIntegralCPUs(pod, &container)
if !isInitContainerGuaranteed {
return false
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
isAppContainerGuaranteed := hasIntegralCPUs(pod, &container)
if !isAppContainerGuaranteed {
return false
return true
// hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false
func hasIntegralCPUs(pod *v1.Pod, container *v1.Container) bool {
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
return cpuQuantity.Value()*1000 == cpuQuantity.MilliValue()
// Scan gathers all the PodResources from the system, using the podresources API client.
@ -67,7 +116,11 @@ func (resMon *PodResourcesScanner) Scan() ([]PodResources, error) {
var podResData []PodResources
for _, podResource := range resp.GetPodResources() {
if !resMon.isWatchable(podResource.GetNamespace()) {
isWatchable, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName())
if err != nil {
return nil, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %v", podResource.GetNamespace(), podResource.GetName(), err)
if !isWatchable {
@ -26,6 +26,7 @@ import (
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
@ -36,7 +37,8 @@ func TestPodScanner(t *testing.T) {
Convey("When I scan for pod resources using fake client and no namespace", t, func() {
mockPodResClient := new(podres.MockPodResourcesListerClient)
resScan, err = NewPodResourcesScanner("*", mockPodResClient)
mockAPIHelper := new(apihelper.MockAPIHelpers)
resScan, err = NewPodResourcesScanner("*", mockPodResClient, mockAPIHelper)
Convey("Creating a Resources Scanner using a mock client", func() {
So(err, ShouldBeNil)
@ -309,7 +311,8 @@ func TestPodScanner(t *testing.T) {
Convey("When I scan for pod resources using fake client and given namespace", t, func() {
mockPodResClient := new(podres.MockPodResourcesListerClient)
resScan, err = NewPodResourcesScanner("pod-res-test", mockPodResClient)
mockAPIHelper := new(apihelper.MockAPIHelpers)
resScan, err = NewPodResourcesScanner("pod-res-test", mockPodResClient, mockAPIHelper)
Convey("Creating a Resources Scanner using a mock client", func() {
So(err, ShouldBeNil)
@ -1,4 +1,4 @@
// Code generated by mockery v2.4.0-beta. DO NOT EDIT.
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Re-generate by running 'make mock'
Reference in a new issue