1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

topologyupdater: Bootstrap nfd-topology-updater in NFD

- This patch allows to expose Resource Hardware Topology information
  through CRDs in Node Feature Discovery.
- In order to do this we introduce another software component called
  nfd-topology-updater in addition to the already existing software
  components nfd-master and nfd-worker.
- nfd-master was enhanced to communicate with nfd-topology-updater
  over gRPC followed by creation of CRs corresponding to the nodes
  in the cluster exposing resource hardware topology information
  of that node.
- Pin kubernetes dependency to one that include pod resource implementation
- This code is responsible for obtaining hardware information from the system
  as well as pod resource information from the Pod Resource API in order to
  determine the allocatable resource information for each NUMA zone. This
  information along with Costs for NUMA zones (obtained by reading NUMA distances)
  is gathered by nfd-topology-updater running on all the nodes
  of the cluster and propagate NUMA zone costs to master in order to populate
  that information in the CRs corresponding to the nodes.
- We use GHW facilities for obtaining system information like CPUs, topology,
  NUMA distances etc.
- This also includes updates made to Makefile and Dockerfile and Manifests for
  deploying nfd-topology-updater.
- This patch includes unit tests
- As part of the Topology Aware Scheduling work, this patch captures
  the configured Topology manager scope in addition to the Topology manager policy.
  Based on the value of both attribues a single string will be populated to the CRD.
  The string value will be on of the following {SingleNUMANodeContainerLevel,
  SingleNUMANodePodLevel, BestEffort, Restricted, None}

Co-Authored-by: Artyom Lukianov <alukiano@redhat.com>
Co-Authored-by: Francesco Romani <fromani@redhat.com>
Co-Authored-by: Talor Itzhak <titzhak@redhat.com>
Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Francesco Romani 2021-05-13 12:55:33 +02:00 committed by Swati Sehgal
parent 00cc07da76
commit b4c92e4eed
23 changed files with 2478 additions and 28 deletions

View file

@ -105,6 +105,9 @@ func initFlags(flagset *flag.FlagSet) *master.Args {
flagset.BoolVar(&args.VerifyNodeName, "verify-node-name", false,
"Verify worker node name against the worker's TLS certificate. "+
"Only takes effect when TLS authentication has been enabled.")
flagset.StringVar(&args.NRTNamespace, "nrt-namespace", "default",
"Namespace in which Node Resource Topology CR are created"+
"Ensure that the namespace specified is already exists.")
return args
}

View file

@ -0,0 +1,128 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"os"
"time"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/kubeconf"
topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/node-feature-discovery/source"
)
const (
// ProgramName is the canonical name of this program
ProgramName = "nfd-topology-updater"
)
func main() {
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)
printVersion := flags.Bool("version", false, "Print version and exit.")
args, resourcemonitorArgs := parseArgs(flags, os.Args[1:]...)
if *printVersion {
fmt.Println(ProgramName, version.Get())
os.Exit(0)
}
// Assert that the version is known
if version.Undefined() {
klog.Warningf("version not set! Set -ldflags \"-X sigs.k8s.io/node-feature-discovery/pkg/version.version=`git describe --tags --dirty --always`\" during build or run.")
}
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()
klConfig, err := kubeconf.GetKubeletConfigFromLocalFile(resourcemonitorArgs.KubeletConfigFile)
if err != nil {
klog.Fatalf("error reading kubelet config: %v", err)
}
tmPolicy := string(topologypolicy.DetectTopologyPolicy(klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope))
klog.Infof("detected kubelet Topology Manager policy %q", tmPolicy)
// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, tmPolicy)
if err != nil {
klog.Exitf("failed to initialize TopologyUpdater instance: %v", err)
}
if err = instance.Run(); err != nil {
klog.Exit(err)
}
}
func parseArgs(flags *flag.FlagSet, osArgs ...string) (*topology.Args, *resourcemonitor.Args) {
args, resourcemonitorArgs := initFlags(flags)
_ = flags.Parse(osArgs)
if len(flags.Args()) > 0 {
fmt.Fprintf(flags.Output(), "unknown command line argument: %s\n", flags.Args()[0])
flags.Usage()
os.Exit(2)
}
return args, resourcemonitorArgs
}
func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
args := &topology.Args{}
resourcemonitorArgs := &resourcemonitor.Args{}
flagset.StringVar(&args.CaFile, "ca-file", "",
"Root certificate for verifying connections")
flagset.StringVar(&args.CertFile, "cert-file", "",
"Certificate used for authenticating connections")
flagset.StringVar(&args.KeyFile, "key-file", "",
"Private key matching -cert-file")
flagset.BoolVar(&args.Oneshot, "oneshot", false,
"Update once and exit")
flagset.BoolVar(&args.NoPublish, "no-publish", false,
"Do not publish discovered features to the cluster-local Kubernetes API server.")
flagset.DurationVar(&resourcemonitorArgs.SleepInterval, "sleep-interval", time.Duration(60)*time.Second,
"Time to sleep between CR updates. Non-positive value implies no CR updatation (i.e. infinite sleep). [Default: 60s]")
flagset.StringVar(&resourcemonitorArgs.Namespace, "watch-namespace", "*",
"Namespace to watch pods (for testing/debugging purpose). Use * for all namespaces.")
flagset.StringVar(&resourcemonitorArgs.KubeletConfigFile, "kubelet-config-file", source.VarDir.Path("lib/kubelet/config.yaml"),
"Kubelet config file path.")
flagset.StringVar(&resourcemonitorArgs.PodResourceSocketPath, "podresources-socket", source.VarDir.Path("lib/kubelet/pod-resources/kubelet.sock"),
"Pod Resource Socket path to use.")
flagset.StringVar(&args.Server, "server", "localhost:8080",
"NFD server address to connecto to.")
flagset.StringVar(&args.ServerNameOverride, "server-name-override", "",
"Hostname expected from server certificate, useful in testing")
initKlogFlags(flagset)
return args, resourcemonitorArgs
}
func initKlogFlags(flagset *flag.FlagSet) {
flags := flag.NewFlagSet("klog flags", flag.ExitOnError)
//flags.SetOutput(ioutil.Discard)
klog.InitFlags(flags)
}

View file

@ -0,0 +1,104 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestArgsParse(t *testing.T) {
Convey("When parsing command line arguments", t, func() {
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)
Convey("When --no-publish and --oneshot flags are passed", func() {
args, finderArgs := parseArgs(flags, "--oneshot", "--no-publish")
Convey("noPublish is set and args.sources is set to the default value", func() {
So(args.NoPublish, ShouldBeTrue)
So(args.Oneshot, ShouldBeTrue)
So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Convey("When valid args are specified for --kubelet-config-file and --sleep-interval,", func() {
args, finderArgs := parseArgs(flags,
"--kubelet-config-file=/path/testconfig.yaml",
"--sleep-interval=30s")
Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse)
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/path/testconfig.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Convey("When valid args are specified for --podresources-socket flag and --sleep-interval is specified", func() {
args, finderArgs := parseArgs(flags,
"--podresources-socket=/path/testkubelet.sock",
"--sleep-interval=30s")
Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse)
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/path/testkubelet.sock")
})
})
Convey("When valid args are specified for--sysfs and --sleep-inteval is specified", func() {
args, finderArgs := parseArgs(flags,
"--sleep-interval=30s")
Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse)
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Convey("When All valid args are specified", func() {
args, finderArgs := parseArgs(flags,
"--no-publish",
"--sleep-interval=30s",
"--kubelet-config-file=/path/testconfig.yaml",
"--podresources-socket=/path/testkubelet.sock",
"--ca-file=ca",
"--cert-file=crt",
"--key-file=key")
Convey("--no-publish is set and args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeTrue)
So(args.CaFile, ShouldEqual, "ca")
So(args.CertFile, ShouldEqual, "crt")
So(args.KeyFile, ShouldEqual, "key")
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/path/testconfig.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/path/testkubelet.sock")
})
})
})
}

4
go.mod
View file

@ -4,7 +4,10 @@ go 1.16
require (
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.5
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.9
github.com/klauspost/cpuid/v2 v2.0.9
github.com/onsi/ginkgo v1.14.0
@ -20,6 +23,7 @@ require (
k8s.io/apimachinery v0.22.0
k8s.io/client-go v0.22.0
k8s.io/klog/v2 v2.9.0
k8s.io/kubelet v0.0.0
k8s.io/kubernetes v1.22.0
sigs.k8s.io/yaml v1.2.0
)

15
go.sum
View file

@ -72,6 +72,8 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -229,6 +231,7 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
@ -245,6 +248,8 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0 h1:K7/B1jt6fIBQVd4Owv2MqGQClcgf0R266+7C/QjRcLc=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@ -385,6 +390,11 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS+hD/kDi1U1QsX2fnZowwBhqD0Dk3bRPKF/Oc8=
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e h1:XTXPzmyiwx2uxk8JaU4mxmBZ+rzZtmEwkNm9H9ETzV0=
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e/go.mod h1:+gR9bjm3W/HnFi90liF+Fj9GpCe/Dsibl9Im8KmC7c4=
github.com/jaypipes/pcidb v0.6.0 h1:VIM7GKVaW4qba30cvB67xSCgJPTzkG8Kzw/cbs5PHWU=
github.com/jaypipes/pcidb v0.6.0/go.mod h1:L2RGk04sfRhp5wvHO0gfRAMoLY/F3PKv/nwJeVoho0o=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
@ -454,6 +464,7 @@ github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible h1
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9Gns0u4=
@ -603,11 +614,13 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
@ -1074,6 +1087,8 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M=
howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0=
k8s.io/api v0.22.0 h1:elCpMZ9UE8dLdYxr55E06TmSeji9I3KH494qH70/y+c=
k8s.io/api v0.22.0/go.mod h1:0AoXXqst47OI/L0oGKq9DG61dvGRPXs7X4/B7KyjBCU=
k8s.io/apiextensions-apiserver v0.22.0 h1:QTuZIQggaE7N8FTjur+1zxLmEPziphK7nNm8t+VNO3g=

View file

@ -0,0 +1,39 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeconf
import (
"io/ioutil"
"github.com/ghodss/yaml"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
)
// GetKubeletConfigFromLocalFile returns KubeletConfiguration loaded from the node local config
func GetKubeletConfigFromLocalFile(kubeletConfigPath string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
kubeletBytes, err := ioutil.ReadFile(kubeletConfigPath)
if err != nil {
return nil, err
}
kubeletConfig := &kubeletconfigv1beta1.KubeletConfiguration{}
if err := yaml.Unmarshal(kubeletBytes, kubeletConfig); err != nil {
return nil, err
}
return kubeletConfig, nil
}

View file

@ -0,0 +1,46 @@
/*
Copyright 2019-2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeconf
import (
"path/filepath"
"testing"
)
type testCaseData struct {
path string
tmPolicy string
}
func TestGetKubeletConfigFromLocalFile(t *testing.T) {
tCases := []testCaseData{
{
path: filepath.Join("..", "..", "test", "data", "kubeletconf.yaml"),
tmPolicy: "single-numa-node",
},
}
for _, tCase := range tCases {
cfg, err := GetKubeletConfigFromLocalFile(tCase.path)
if err != nil {
t.Errorf("failed to read config from %q: %v", tCase.path, err)
}
if cfg.TopologyManagerPolicy != tCase.tmPolicy {
t.Errorf("TM policy mismatch, found %q expected %q", cfg.TopologyManagerPolicy, tCase.tmPolicy)
}
}
}

View file

@ -55,7 +55,11 @@ type Args struct {
Klog map[string]*utils.KlogFlagVal
}
var nodeName = os.Getenv("NODE_NAME")
var nodeName string
func init() {
nodeName = os.Getenv("NODE_NAME")
}
// NodeName returns the name of the k8s node we're running on.
func NodeName() string { return nodeName }

View file

@ -0,0 +1,260 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologyupdater
import (
"fmt"
"time"
"k8s.io/klog/v2"
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"golang.org/x/net/context"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/podres"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
pb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
// Command line arguments
type Args struct {
nfdclient.Args
NoPublish bool
Oneshot bool
}
type NfdTopologyUpdater interface {
nfdclient.NfdClient
Update(v1alpha1.ZoneList) error
}
type staticNodeInfo struct {
tmPolicy string
}
type nfdTopologyUpdater struct {
nfdclient.NfdBaseClient
nodeInfo *staticNodeInfo
args Args
resourcemonitorArgs resourcemonitor.Args
certWatch *utils.FsWatcher
client pb.NodeTopologyClient
stop chan struct{} // channel for signaling stop
}
// Create new NewTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy string) (NfdTopologyUpdater, error) {
base, err := nfdclient.NewNfdBaseClient(&args.Args)
if err != nil {
return nil, err
}
nfd := &nfdTopologyUpdater{
NfdBaseClient: base,
args: args,
resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: &staticNodeInfo{
tmPolicy: policy,
},
stop: make(chan struct{}, 1),
}
return nfd, nil
}
// Run nfdTopologyUpdater client. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error {
klog.Infof("Node Feature Discovery Topology Updater %s", version.Get())
klog.Infof("NodeName: '%s'", nfdclient.NodeName())
podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil {
klog.Fatalf("Failed to get PodResource Client: %v", err)
}
var resScan resourcemonitor.ResourcesScanner
resScan, err = resourcemonitor.NewPodResourcesScanner(w.resourcemonitorArgs.Namespace, podResClient)
if err != nil {
klog.Fatalf("Failed to initialize ResourceMonitor instance: %v", err)
}
// CAUTION: these resources are expected to change rarely - if ever.
// So we are intentionally do this once during the process lifecycle.
// TODO: Obtain node resources dynamically from the podresource API
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha1.ZoneList
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient)
if err != nil {
klog.Fatalf("Failed to obtain node resource information: %v", err)
return err
}
klog.V(2).Infof("resAggr is: %v\n", resAggr)
// Create watcher for TLS certificates
w.certWatch, err = utils.CreateFsWatcher(time.Second, w.args.CaFile, w.args.CertFile, w.args.KeyFile)
if err != nil {
return err
}
crTrigger := time.After(0)
for {
select {
case <-crTrigger:
klog.Infof("Scanning\n")
podResources, err := resScan.Scan()
utils.KlogDump(1, "podResources are", " ", podResources)
if err != nil {
klog.Warningf("Scan failed: %v\n", err)
continue
}
zones = resAggr.Aggregate(podResources)
utils.KlogDump(1, "After aggregating resources identified zones are", " ", zones)
if err = w.Update(zones); err != nil {
return err
}
if w.args.Oneshot {
return nil
}
if w.resourcemonitorArgs.SleepInterval > 0 {
crTrigger = time.After(w.resourcemonitorArgs.SleepInterval)
}
case <-w.certWatch.Events:
klog.Infof("TLS certificate update, renewing connection to nfd-master")
w.Disconnect()
if err := w.Connect(); err != nil {
return err
}
case <-w.stop:
klog.Infof("shutting down nfd-topology-updater")
w.certWatch.Close()
return nil
}
}
}
func (w *nfdTopologyUpdater) Update(zones v1alpha1.ZoneList) error {
// Connect to NFD master
err := w.Connect()
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
defer w.Disconnect()
if w.client == nil {
return nil
}
err = advertiseNodeTopology(w.client, zones, w.nodeInfo.tmPolicy, nfdclient.NodeName())
if err != nil {
return fmt.Errorf("failed to advertise node topology: %w", err)
}
return nil
}
// Stop NFD Topology Updater
func (w *nfdTopologyUpdater) Stop() {
select {
case w.stop <- struct{}{}:
default:
}
}
// connect creates a client connection to the NFD master
func (w *nfdTopologyUpdater) Connect() error {
// Return a dummy connection in case of dry-run
if w.args.NoPublish {
return nil
}
if err := w.NfdBaseClient.Connect(); err != nil {
return err
}
w.client = pb.NewNodeTopologyClient(w.ClientConn())
return nil
}
// disconnect closes the connection to NFD master
func (w *nfdTopologyUpdater) Disconnect() {
w.NfdBaseClient.Disconnect()
w.client = nil
}
// advertiseNodeTopology advertises the topology CR to a Kubernetes node
// via the NFD server.
func advertiseNodeTopology(client pb.NodeTopologyClient, zoneInfo v1alpha1.ZoneList, tmPolicy string, nodeName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
zones := make([]*pb.Zone, len(zoneInfo))
// TODO: Avoid copying of data to allow returning the zone info
// directly in a compatible data type (i.e. []*v1alpha1.Zone).
for i, zone := range zoneInfo {
resInfo := make([]*pb.ResourceInfo, len(zone.Resources))
for j, info := range zone.Resources {
resInfo[j] = &pb.ResourceInfo{
Name: info.Name,
Allocatable: info.Allocatable.String(),
Capacity: info.Capacity.String(),
}
}
zones[i] = &pb.Zone{
Name: zone.Name,
Type: zone.Type,
Parent: zone.Parent,
Resources: resInfo,
Costs: updateMap(zone.Costs),
}
}
topologyReq := &pb.NodeTopologyRequest{
Zones: zones,
NfdVersion: version.Get(),
NodeName: nodeName,
TopologyPolicies: []string{tmPolicy},
}
utils.KlogDump(1, "Sending NodeTopologyRequest to nfd-master:", " ", topologyReq)
_, err := client.UpdateNodeTopology(ctx, topologyReq)
if err != nil {
return err
}
return nil
}
func updateMap(data []v1alpha1.CostInfo) []*pb.CostInfo {
ret := make([]*pb.CostInfo, len(data))
for i, cost := range data {
ret[i] = &pb.CostInfo{
Name: cost.Name,
Value: int32(cost.Value),
}
}
return ret
}

View file

@ -0,0 +1,167 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologyupdater_test
import (
"fmt"
"os"
"testing"
"time"
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
. "github.com/smartystreets/goconvey/convey"
"k8s.io/apimachinery/pkg/util/intstr"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
u "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/topology-updater"
nfdmaster "sigs.k8s.io/node-feature-discovery/pkg/nfd-master"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/test/data"
)
type testContext struct {
master nfdmaster.NfdMaster
errs chan error
}
func setupTest(args *nfdmaster.Args) testContext {
// Fixed port and no-publish, for convenience
args.NoPublish = true
args.Port = 8192
m, err := nfdmaster.NewNfdMaster(args)
if err != nil {
fmt.Printf("Test setup failed: %v\n", err)
os.Exit(1)
}
ctx := testContext{master: m, errs: make(chan error)}
// Run nfd-master instance, intended to be used as the server counterpart
go func() {
ctx.errs <- ctx.master.Run()
close(ctx.errs)
}()
ready := ctx.master.WaitForReady(time.Second)
if !ready {
fmt.Println("Test setup failed: timeout while waiting for nfd-master")
os.Exit(1)
}
return ctx
}
func teardownTest(ctx testContext) {
ctx.master.Stop()
for e := range ctx.errs {
if e != nil {
fmt.Printf("Error in test context: %v\n", e)
os.Exit(1)
}
}
}
func TestNewTopologyUpdater(t *testing.T) {
Convey("When initializing new NfdTopologyUpdater instance", t, func() {
Convey("When one of --cert-file, --key-file or --ca-file is missing", func() {
tmPolicy := "fake-topology-manager-policy"
_, err := u.NewTopologyUpdater(u.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}}, resourcemonitor.Args{}, tmPolicy)
_, err2 := u.NewTopologyUpdater(u.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}}, resourcemonitor.Args{}, tmPolicy)
_, err3 := u.NewTopologyUpdater(u.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}}, resourcemonitor.Args{}, tmPolicy)
Convey("An error should be returned", func() {
So(err, ShouldNotBeNil)
So(err2, ShouldNotBeNil)
So(err3, ShouldNotBeNil)
})
})
})
}
func TestUpdate(t *testing.T) {
ctx := setupTest(&nfdmaster.Args{})
resourceInfo := v1alpha1.ResourceInfoList{
v1alpha1.ResourceInfo{
Name: "cpu",
Allocatable: intstr.FromString("2"),
Capacity: intstr.FromString("4"),
},
}
zones := v1alpha1.ZoneList{
v1alpha1.Zone{
Name: "node-0",
Type: "Node",
Resources: resourceInfo,
},
}
defer teardownTest(ctx)
Convey("When running nfd-topology-updater against nfd-master", t, func() {
Convey("When running as a Oneshot job with Zones", func() {
args := u.Args{
Oneshot: true,
Args: nfdclient.Args{
Server: "localhost:8192"},
}
updater, _ := u.NewTopologyUpdater(args, resourcemonitor.Args{}, "fake-topology-manager-policy")
err := updater.Update(zones)
Convey("No error should be returned", func() {
So(err, ShouldBeNil)
})
})
})
}
func TestRunTls(t *testing.T) {
masterArgs := &nfdmaster.Args{
CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-master.crt"),
KeyFile: data.FilePath("nfd-test-master.key"),
VerifyNodeName: false,
}
ctx := setupTest(masterArgs)
defer teardownTest(ctx)
Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() {
Convey("When publishing CRs obtained from Zones", func() {
resourceInfo := v1alpha1.ResourceInfoList{
v1alpha1.ResourceInfo{
Name: "cpu",
Allocatable: intstr.FromString("2"),
Capacity: intstr.FromString("4"),
},
}
zones := v1alpha1.ZoneList{
v1alpha1.Zone{
Name: "node-0",
Type: "Node",
Resources: resourceInfo,
},
}
updaterArgs := u.Args{
Args: nfdclient.Args{
CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-topology-updater.crt"),
KeyFile: data.FilePath("nfd-test-topology-updater.key"),
Server: "localhost:8192",
ServerNameOverride: "nfd-test-master",
},
Oneshot: true,
}
updater, _ := u.NewTopologyUpdater(updaterArgs, resourcemonitor.Args{}, "fake-topology-manager-policy")
err := updater.Update(zones)
Convey("No error should be returned", func() {
So(err, ShouldBeNil)
})
})
})
}

View file

@ -29,17 +29,22 @@ import (
"strings"
"time"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
api "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/peer"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
pb "sigs.k8s.io/node-feature-discovery/pkg/labeler"
topologypb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
@ -90,6 +95,7 @@ type Args struct {
Prune bool
VerifyNodeName bool
ResourceLabels utils.StringSetVal
NRTNamespace string
}
type NfdMaster interface {
@ -195,6 +201,7 @@ func (m *nfdMaster) Run() error {
m.server = grpc.NewServer(serverOpts...)
pb.RegisterLabelerServer(m.server, m)
grpc_health_v1.RegisterHealthServer(m.server, health.NewServer())
topologypb.RegisterNodeTopologyServer(m.server, m)
klog.Infof("gRPC server serving on port: %d", m.args.Port)
// Run gRPC server
@ -375,29 +382,9 @@ func verifyNodeName(cert *x509.Certificate, nodeName string) error {
// SetLabels implements LabelerServer
func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.SetLabelsReply, error) {
if m.args.VerifyNodeName {
// Client authorization.
// Check that the node name matches the CN from the TLS cert
client, ok := peer.FromContext(c)
if !ok {
klog.Errorf("gRPC request error: failed to get peer (client)")
return &pb.SetLabelsReply{}, fmt.Errorf("failed to get peer (client)")
}
tlsAuth, ok := client.AuthInfo.(credentials.TLSInfo)
if !ok {
klog.Errorf("gRPC request error: incorrect client credentials from '%v'", client.Addr)
return &pb.SetLabelsReply{}, fmt.Errorf("incorrect client credentials")
}
if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
klog.Errorf("gRPC request error: client certificate verification for '%v' failed", client.Addr)
return &pb.SetLabelsReply{}, fmt.Errorf("client certificate verification failed")
}
err := verifyNodeName(tlsAuth.State.VerifiedChains[0][0], r.NodeName)
if err != nil {
klog.Errorf("gRPC request error: authorization for %v failed: %v", client.Addr, err)
return &pb.SetLabelsReply{}, err
}
err := authorizeClient(c, m.args.VerifyNodeName, r.NodeName)
if err != nil {
return &pb.SetLabelsReply{}, err
}
if klog.V(1).Enabled() {
klog.Infof("REQUEST Node: %q NFD-version: %q Labels: %s", r.NodeName, r.NfdVersion, r.Labels)
@ -421,6 +408,55 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
return &pb.SetLabelsReply{}, nil
}
func authorizeClient(c context.Context, checkNodeName bool, nodeName string) error {
if checkNodeName {
// Client authorization.
// Check that the node name matches the CN from the TLS cert
client, ok := peer.FromContext(c)
if !ok {
klog.Errorf("gRPC request error: failed to get peer (client)")
return fmt.Errorf("failed to get peer (client)")
}
tlsAuth, ok := client.AuthInfo.(credentials.TLSInfo)
if !ok {
klog.Errorf("gRPC request error: incorrect client credentials from '%v'", client.Addr)
return fmt.Errorf("incorrect client credentials")
}
if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
klog.Errorf("gRPC request error: client certificate verification for '%v' failed", client.Addr)
return fmt.Errorf("client certificate verification failed")
}
err := verifyNodeName(tlsAuth.State.VerifiedChains[0][0], nodeName)
if err != nil {
klog.Errorf("gRPC request error: authorization for %v failed: %v", client.Addr, err)
return err
}
}
return nil
}
func (m *nfdMaster) UpdateNodeTopology(c context.Context, r *topologypb.NodeTopologyRequest) (*topologypb.NodeTopologyResponse, error) {
err := authorizeClient(c, m.args.VerifyNodeName, r.NodeName)
if err != nil {
return &topologypb.NodeTopologyResponse{}, err
}
if klog.V(1).Enabled() {
klog.Infof("REQUEST Node: %s NFD-version: %s Topology Policy: %s", r.NodeName, r.NfdVersion, r.TopologyPolicies)
utils.KlogDump(1, "Zones received:", " ", r.Zones)
} else {
klog.Infof("received CR updation request for node %q", r.NodeName)
}
if !m.args.NoPublish {
err := m.updateCR(r.NodeName, r.TopologyPolicies, r.Zones, m.args.NRTNamespace)
if err != nil {
klog.Errorf("failed to advertise labels: %v", err)
return &topologypb.NodeTopologyResponse{}, err
}
}
return &topologypb.NodeTopologyResponse{}, nil
}
// updateNodeFeatures ensures the Kubernetes node object is up to date,
// creating new labels and extended resources where necessary and removing
// outdated ones. Also updates the corresponding annotations.
@ -590,3 +626,77 @@ func stringToNsNames(cslist, ns string) []string {
}
return names
}
func updateMap(data []*topologypb.CostInfo) []v1alpha1.CostInfo {
ret := make([]v1alpha1.CostInfo, len(data))
for i, cost := range data {
ret[i] = v1alpha1.CostInfo{
Name: cost.Name,
Value: int(cost.Value),
}
}
return ret
}
func modifyCR(topoUpdaterZones []*topologypb.Zone) []v1alpha1.Zone {
zones := make([]v1alpha1.Zone, len(topoUpdaterZones))
// TODO: Avoid copying of data to allow returning the zone info
// directly in a compatible data type (i.e. []*v1alpha1.Zone).
for i, zone := range topoUpdaterZones {
resInfo := make([]v1alpha1.ResourceInfo, len(zone.Resources))
for j, info := range zone.Resources {
resInfo[j] = v1alpha1.ResourceInfo{
Name: info.Name,
Allocatable: intstr.FromString(info.Allocatable),
Capacity: intstr.FromString(info.Capacity),
}
}
zones[i] = v1alpha1.Zone{
Name: zone.Name,
Type: zone.Type,
Parent: zone.Parent,
Costs: updateMap(zone.Costs),
Resources: resInfo,
}
}
return zones
}
func (m *nfdMaster) updateCR(hostname string, tmpolicy []string, topoUpdaterZones []*topologypb.Zone, namespace string) error {
cli, err := m.apihelper.GetTopologyClient()
if err != nil {
return err
}
zones := modifyCR(topoUpdaterZones)
nrt, err := cli.TopologyV1alpha1().NodeResourceTopologies(namespace).Get(context.TODO(), hostname, metav1.GetOptions{})
if errors.IsNotFound(err) {
nrtNew := v1alpha1.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: hostname,
},
Zones: zones,
TopologyPolicies: tmpolicy,
}
_, err := cli.TopologyV1alpha1().NodeResourceTopologies(namespace).Create(context.TODO(), &nrtNew, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create v1alpha1.NodeResourceTopology!:%w", err)
}
return nil
} else if err != nil {
return err
}
nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zones
nrtUpdated, err := cli.TopologyV1alpha1().NodeResourceTopologies(namespace).Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update v1alpha1.NodeResourceTopology!:%w", err)
}
utils.KlogDump(2, "CR instance updated resTopo:", " ", nrtUpdated)
return nil
}

41
pkg/podres/client.go Normal file
View file

@ -0,0 +1,41 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podres
import (
"fmt"
"log"
"time"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
)
const (
// obtained the following values from node e2e tests : https://github.com/kubernetes/kubernetes/blob/82baa26905c94398a0d19e1b1ecf54eb8acb6029/test/e2e_node/util.go#L70
defaultPodResourcesTimeout = 10 * time.Second
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
)
func GetPodResClient(socketPath string) (podresourcesapi.PodResourcesListerClient, error) {
podResourceClient, _, err := podresources.GetV1Client(socketPath, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
if err != nil {
return nil, fmt.Errorf("failed to create podresource client: %w", err)
}
log.Printf("Connected to '%q'!", socketPath)
return podResourceClient, nil
}

View file

@ -0,0 +1,78 @@
// Code generated by mockery v2.4.0-beta. DO NOT EDIT.
package podres
import (
context "context"
grpc "google.golang.org/grpc"
mock "github.com/stretchr/testify/mock"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
)
// MockPodResourcesListerClient is an autogenerated mock type for the PodResourcesListerClient type
type MockPodResourcesListerClient struct {
mock.Mock
}
// GetAllocatableResources provides a mock function with given fields: ctx, in, opts
func (_m *MockPodResourcesListerClient) GetAllocatableResources(ctx context.Context, in *v1.AllocatableResourcesRequest, opts ...grpc.CallOption) (*v1.AllocatableResourcesResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *v1.AllocatableResourcesResponse
if rf, ok := ret.Get(0).(func(context.Context, *v1.AllocatableResourcesRequest, ...grpc.CallOption) *v1.AllocatableResourcesResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.AllocatableResourcesResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1.AllocatableResourcesRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// List provides a mock function with given fields: ctx, in, opts
func (_m *MockPodResourcesListerClient) List(ctx context.Context, in *v1.ListPodResourcesRequest, opts ...grpc.CallOption) (*v1.ListPodResourcesResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *v1.ListPodResourcesResponse
if rf, ok := ret.Get(0).(func(context.Context, *v1.ListPodResourcesRequest, ...grpc.CallOption) *v1.ListPodResourcesResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.ListPodResourcesResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1.ListPodResourcesRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View file

@ -0,0 +1,273 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"context"
"fmt"
"time"
"github.com/jaypipes/ghw"
topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"sigs.k8s.io/node-feature-discovery/source"
)
const (
// obtained these values from node e2e tests : https://github.com/kubernetes/kubernetes/blob/82baa26905c94398a0d19e1b1ecf54eb8acb6029/test/e2e_node/util.go#L70
defaultPodResourcesTimeout = 10 * time.Second
)
type nodeResources struct {
perNUMACapacity map[int]map[v1.ResourceName]int64
// mapping: resourceName -> resourceID -> nodeID
resourceID2NUMAID map[string]map[string]int
topo *ghw.TopologyInfo
}
type resourceData struct {
allocatable int64
capacity int64
}
func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesListerClient) (ResourcesAggregator, error) {
var err error
topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{
"/sys": string(source.SysfsDir),
}))
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout)
defer cancel()
// Pod Resource API client
resp, err := podResourceClient.GetAllocatableResources(ctx, &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("can't receive response: %v.Get(_) = _, %w", podResourceClient, err)
}
return NewResourcesAggregatorFromData(topo, resp), nil
}
// NewResourcesAggregatorFromData is used to aggregate resource information based on the received data from underlying hardware and podresource API
func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesapi.AllocatableResourcesResponse) ResourcesAggregator {
allDevs := getContainerDevicesFromAllocatableResources(resp, topo)
return &nodeResources{
topo: topo,
resourceID2NUMAID: makeResourceMap(len(topo.Nodes), allDevs),
perNUMACapacity: makeNodeCapacity(allDevs),
}
}
// Aggregate provides the mapping (numa zone name) -> Zone from the given PodResources.
func (noderesourceData *nodeResources) Aggregate(podResData []PodResources) topologyv1alpha1.ZoneList {
perNuma := make(map[int]map[v1.ResourceName]*resourceData)
for nodeID, nodeRes := range noderesourceData.perNUMACapacity {
perNuma[nodeID] = make(map[v1.ResourceName]*resourceData)
for resName, resCap := range nodeRes {
perNuma[nodeID][resName] = &resourceData{
capacity: resCap,
allocatable: resCap,
}
}
}
for _, podRes := range podResData {
for _, contRes := range podRes.Containers {
for _, res := range contRes.Resources {
noderesourceData.updateAllocatable(perNuma, res)
}
}
}
// zones := make([]topologyv1alpha1.Zone, 0)
zones := make(topologyv1alpha1.ZoneList, 0)
for nodeID, resList := range perNuma {
zone := topologyv1alpha1.Zone{
Name: makeZoneName(nodeID),
Type: "Node",
Resources: make(topologyv1alpha1.ResourceInfoList, 0),
}
costs, err := makeCostsPerNumaNode(noderesourceData.topo.Nodes, nodeID)
if err != nil {
klog.Infof("cannot find costs for NUMA node %d: %v", nodeID, err)
} else {
zone.Costs = topologyv1alpha1.CostList(costs)
}
for name, resData := range resList {
allocatableQty := *resource.NewQuantity(resData.allocatable, resource.DecimalSI)
capacityQty := *resource.NewQuantity(resData.capacity, resource.DecimalSI)
zone.Resources = append(zone.Resources, topologyv1alpha1.ResourceInfo{
Name: name.String(),
Allocatable: intstr.FromString(allocatableQty.String()),
Capacity: intstr.FromString(capacityQty.String()),
})
}
zones = append(zones, zone)
}
return zones
}
// getContainerDevicesFromAllocatableResources normalize all compute resources to ContainerDevices.
// This is helpful because cpuIDs are not represented as ContainerDevices, but with a different format;
// Having a consistent representation of all the resources as ContainerDevices makes it simpler for
func getContainerDevicesFromAllocatableResources(availRes *podresourcesapi.AllocatableResourcesResponse, topo *ghw.TopologyInfo) []*podresourcesapi.ContainerDevices {
var contDevs []*podresourcesapi.ContainerDevices
contDevs = append(contDevs, availRes.GetDevices()...)
cpuIDToNodeIDMap := MakeLogicalCoreIDToNodeIDMap(topo)
cpusPerNuma := make(map[int][]string)
for _, cpuID := range availRes.GetCpuIds() {
nodeID, ok := cpuIDToNodeIDMap[int(cpuID)]
if !ok {
klog.Infof("cannot find the NUMA node for CPU %d", cpuID)
continue
}
cpuIDList := cpusPerNuma[nodeID]
cpuIDList = append(cpuIDList, fmt.Sprintf("%d", cpuID))
cpusPerNuma[nodeID] = cpuIDList
}
for nodeID, cpuList := range cpusPerNuma {
contDevs = append(contDevs, &podresourcesapi.ContainerDevices{
ResourceName: string(v1.ResourceCPU),
DeviceIds: cpuList,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{ID: int64(nodeID)},
},
},
})
}
return contDevs
}
// updateAllocatable computes the actually alloctable resources.
// This function assumes the allocatable resources are initialized to be equal to the capacity.
func (noderesourceData *nodeResources) updateAllocatable(numaData map[int]map[v1.ResourceName]*resourceData, ri ResourceInfo) {
for _, resID := range ri.Data {
resName := string(ri.Name)
resMap, ok := noderesourceData.resourceID2NUMAID[resName]
if !ok {
klog.Infof("unknown resource %q", ri.Name)
continue
}
nodeID, ok := resMap[resID]
if !ok {
klog.Infof("unknown resource %q: %q", resName, resID)
continue
}
numaData[nodeID][ri.Name].allocatable--
}
}
// makeZoneName returns the canonical name of a NUMA zone from its ID.
func makeZoneName(nodeID int) string {
return fmt.Sprintf("node-%d", nodeID)
}
// makeNodeCapacity computes the node capacity as mapping (NUMA node ID) -> Resource -> Capacity (amount, int).
// The computation is done assuming all the resources to represent the capacity for are represented on a slice
// of ContainerDevices. No special treatment is done for CPU IDs. See getContainerDevicesFromAllocatableResources.
func makeNodeCapacity(devices []*podresourcesapi.ContainerDevices) map[int]map[v1.ResourceName]int64 {
perNUMACapacity := make(map[int]map[v1.ResourceName]int64)
// initialize with the capacities
for _, device := range devices {
resourceName := device.GetResourceName()
for _, node := range device.GetTopology().GetNodes() {
nodeID := int(node.GetID())
nodeRes, ok := perNUMACapacity[nodeID]
if !ok {
nodeRes = make(map[v1.ResourceName]int64)
}
nodeRes[v1.ResourceName(resourceName)] += int64(len(device.GetDeviceIds()))
perNUMACapacity[nodeID] = nodeRes
}
}
return perNUMACapacity
}
func MakeLogicalCoreIDToNodeIDMap(topo *ghw.TopologyInfo) map[int]int {
core2node := make(map[int]int)
for _, node := range topo.Nodes {
for _, core := range node.Cores {
for _, procID := range core.LogicalProcessors {
core2node[procID] = node.ID
}
}
}
return core2node
}
// makeResourceMap creates the mapping (resource name) -> (device ID) -> (NUMA node ID) from the given slice of ContainerDevices.
// this is useful to quickly learn the NUMA ID of a given (resource, device).
func makeResourceMap(numaNodes int, devices []*podresourcesapi.ContainerDevices) map[string]map[string]int {
resourceMap := make(map[string]map[string]int)
for _, device := range devices {
resourceName := device.GetResourceName()
_, ok := resourceMap[resourceName]
if !ok {
resourceMap[resourceName] = make(map[string]int)
}
for _, node := range device.GetTopology().GetNodes() {
nodeID := int(node.GetID())
for _, deviceID := range device.GetDeviceIds() {
resourceMap[resourceName][deviceID] = nodeID
}
}
}
return resourceMap
}
// makeCostsPerNumaNode builds the cost map to reach all the known NUMA zones (mapping (numa zone) -> cost) starting from the given NUMA zone.
func makeCostsPerNumaNode(nodes []*ghw.TopologyNode, nodeIDSrc int) ([]topologyv1alpha1.CostInfo, error) {
nodeSrc := findNodeByID(nodes, nodeIDSrc)
if nodeSrc == nil {
return nil, fmt.Errorf("unknown node: %d", nodeIDSrc)
}
nodeCosts := make([]topologyv1alpha1.CostInfo, 0)
for nodeIDDst, dist := range nodeSrc.Distances {
// TODO: this assumes there are no holes (= no offline node) in the distance vector
nodeCosts = append(nodeCosts, topologyv1alpha1.CostInfo{
Name: makeZoneName(nodeIDDst),
Value: dist,
})
}
return nodeCosts, nil
}
func findNodeByID(nodes []*ghw.TopologyNode, nodeID int) *ghw.TopologyNode {
for _, node := range nodes {
if node.ID == nodeID {
return node
}
}
return nil
}

View file

@ -0,0 +1,497 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"encoding/json"
"log"
"sort"
"testing"
"github.com/jaypipes/ghw"
cmp "github.com/google/go-cmp/cmp"
. "github.com/smartystreets/goconvey/convey"
"k8s.io/apimachinery/pkg/util/intstr"
topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
)
func TestResourcesAggregator(t *testing.T) {
fakeTopo := ghw.TopologyInfo{}
Convey("When recovering test topology from JSON data", t, func() {
err := json.Unmarshal([]byte(testTopology), &fakeTopo)
So(err, ShouldBeNil)
})
var resAggr ResourcesAggregator
Convey("When I aggregate the node resources fake data and no pod allocation", t, func() {
availRes := &v1.AllocatableResourcesResponse{
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-0"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-1"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-2"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-3"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netBBB-0"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netBBB-1"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/gpu",
DeviceIds: []string{"gpuAAA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
},
CpuIds: []int64{
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
},
}
resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes)
Convey("When aggregating resources", func() {
expected := topologyv1alpha1.ZoneList{
topologyv1alpha1.Zone{
Name: "node-0",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 10,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 20,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Allocatable: intstr.FromString("12"),
Capacity: intstr.FromString("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Allocatable: intstr.FromString("4"),
Capacity: intstr.FromString("4"),
},
},
},
topologyv1alpha1.Zone{
Name: "node-1",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 20,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 10,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Allocatable: intstr.FromString("12"),
Capacity: intstr.FromString("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/gpu",
Allocatable: intstr.FromString("1"),
Capacity: intstr.FromString("1"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Allocatable: intstr.FromString("4"),
Capacity: intstr.FromString("4"),
},
},
},
}
res := resAggr.Aggregate(nil) // no pods allocation
sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
log.Printf("result=%v", res)
log.Printf("expected=%v", expected)
log.Printf("diff=%s", cmp.Diff(res, expected))
So(cmp.Equal(res, expected), ShouldBeFalse)
})
})
Convey("When I aggregate the node resources fake data and some pod allocation", t, func() {
availRes := &v1.AllocatableResourcesResponse{
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netBBB"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/gpu",
DeviceIds: []string{"gpuAAA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
},
CpuIds: []int64{
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
},
}
resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes)
Convey("When aggregating resources", func() {
podRes := []PodResources{
PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []ContainerResources{
ContainerResources{
Name: "test-cnt-0",
Resources: []ResourceInfo{
ResourceInfo{
Name: "cpu",
Data: []string{"5", "7"},
},
ResourceInfo{
Name: "fake.io/net",
Data: []string{"netBBB"},
},
},
},
},
},
}
expected := topologyv1alpha1.ZoneList{
topologyv1alpha1.Zone{
Name: "node-0",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 10,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 20,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Allocatable: intstr.FromString("12"),
Capacity: intstr.FromString("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Allocatable: intstr.FromString("1"),
Capacity: intstr.FromString("1"),
},
},
},
topologyv1alpha1.Zone{
Name: "node-1",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 20,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 10,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Allocatable: intstr.FromString("10"),
Capacity: intstr.FromString("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/gpu",
Allocatable: intstr.FromString("1"),
Capacity: intstr.FromString("1"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Allocatable: intstr.FromString("0"),
Capacity: intstr.FromString("1"),
},
},
},
}
res := resAggr.Aggregate(podRes)
sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
log.Printf("result=%v", res)
log.Printf("expected=%v", expected)
log.Printf("diff=%s", cmp.Diff(res, expected))
So(cmp.Equal(res, expected), ShouldBeTrue)
})
})
}
// ghwc topology -f json
var testTopology string = `{
"nodes": [
{
"id": 0,
"cores": [
{
"id": 0,
"index": 0,
"total_threads": 2,
"logical_processors": [
0,
12
]
},
{
"id": 10,
"index": 1,
"total_threads": 2,
"logical_processors": [
10,
22
]
},
{
"id": 1,
"index": 2,
"total_threads": 2,
"logical_processors": [
14,
2
]
},
{
"id": 2,
"index": 3,
"total_threads": 2,
"logical_processors": [
16,
4
]
},
{
"id": 8,
"index": 4,
"total_threads": 2,
"logical_processors": [
18,
6
]
},
{
"id": 9,
"index": 5,
"total_threads": 2,
"logical_processors": [
20,
8
]
}
],
"distances": [
10,
20
]
},
{
"id": 1,
"cores": [
{
"id": 0,
"index": 0,
"total_threads": 2,
"logical_processors": [
1,
13
]
},
{
"id": 10,
"index": 1,
"total_threads": 2,
"logical_processors": [
11,
23
]
},
{
"id": 1,
"index": 2,
"total_threads": 2,
"logical_processors": [
15,
3
]
},
{
"id": 2,
"index": 3,
"total_threads": 2,
"logical_processors": [
17,
5
]
},
{
"id": 8,
"index": 4,
"total_threads": 2,
"logical_processors": [
19,
7
]
},
{
"id": 9,
"index": 5,
"total_threads": 2,
"logical_processors": [
21,
9
]
}
],
"distances": [
20,
10
]
}
]
}`

View file

@ -0,0 +1,120 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
)
type PodResourcesScanner struct {
namespace string
podResourceClient podresourcesapi.PodResourcesListerClient
}
func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.PodResourcesListerClient) (ResourcesScanner, error) {
resourcemonitorInstance := &PodResourcesScanner{
namespace: namespace,
podResourceClient: podResourceClient,
}
if resourcemonitorInstance.namespace != "*" {
klog.Infof("watching namespace %q", resourcemonitorInstance.namespace)
} else {
klog.Infof("watching all namespaces")
}
return resourcemonitorInstance, nil
}
// isWatchable tells if the the given namespace should be watched.
func (resMon *PodResourcesScanner) isWatchable(podNamespace string) bool {
if resMon.namespace == "*" {
return true
}
// TODO: add an explicit check for guaranteed pods
return resMon.namespace == podNamespace
}
// Scan gathers all the PodResources from the system, using the podresources API client.
func (resMon *PodResourcesScanner) Scan() ([]PodResources, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout)
defer cancel()
// Pod Resource API client
resp, err := resMon.podResourceClient.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("can't receive response: %v.Get(_) = _, %w", resMon.podResourceClient, err)
}
var podResData []PodResources
for _, podResource := range resp.GetPodResources() {
if !resMon.isWatchable(podResource.GetNamespace()) {
continue
}
podRes := PodResources{
Name: podResource.GetName(),
Namespace: podResource.GetNamespace(),
}
for _, container := range podResource.GetContainers() {
contRes := ContainerResources{
Name: container.Name,
}
cpuIDs := container.GetCpuIds()
if len(cpuIDs) > 0 {
var resCPUs []string
for _, cpuID := range container.GetCpuIds() {
resCPUs = append(resCPUs, fmt.Sprintf("%d", cpuID))
}
contRes.Resources = []ResourceInfo{
{
Name: v1.ResourceCPU,
Data: resCPUs,
},
}
}
for _, device := range container.GetDevices() {
contRes.Resources = append(contRes.Resources, ResourceInfo{
Name: v1.ResourceName(device.ResourceName),
Data: device.DeviceIds,
})
}
if len(contRes.Resources) == 0 {
continue
}
podRes.Containers = append(podRes.Containers, contRes)
}
if len(podRes.Containers) == 0 {
continue
}
podResData = append(podResData, podRes)
}
return podResData, nil
}

View file

@ -0,0 +1,381 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"fmt"
"reflect"
"testing"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"sigs.k8s.io/node-feature-discovery/pkg/podres"
)
func TestPodScanner(t *testing.T) {
var resScan ResourcesScanner
var err error
Convey("When I scan for pod resources using fake client and no namespace", t, func() {
mockPodResClient := new(podres.MockPodResourcesListerClient)
resScan, err = NewPodResourcesScanner("*", mockPodResClient)
Convey("Creating a Resources Scanner using a mock client", func() {
So(err, ShouldBeNil)
})
Convey("When I get error", func() {
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(nil, fmt.Errorf("fake error"))
res, err := resScan.Scan()
Convey("Error is present", func() {
So(err, ShouldNotBeNil)
})
Convey("Return PodResources should be nil", func() {
So(res, ShouldBeNil)
})
})
Convey("When I successfully get empty response", func() {
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(&v1.ListPodResourcesResponse{}, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should be zero", func() {
So(len(res), ShouldEqual, 0)
})
})
Convey("When I successfully get valid response", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
&v1.PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []*v1.ContainerResources{
&v1.ContainerResources{
Name: "test-cnt-0",
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/resource",
DeviceIds: []string{"devA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{ID: 0},
},
},
},
},
CpuIds: []int64{0, 1},
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should have values", func() {
So(len(res), ShouldBeGreaterThan, 0)
expected := []PodResources{
PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []ContainerResources{
ContainerResources{
Name: "test-cnt-0",
Resources: []ResourceInfo{
ResourceInfo{
Name: "cpu",
Data: []string{"0", "1"},
},
ResourceInfo{
Name: "fake.io/resource",
Data: []string{"devA"},
},
},
},
},
},
}
So(reflect.DeepEqual(res, expected), ShouldBeTrue)
})
})
Convey("When I successfully get valid response without topology", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
&v1.PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []*v1.ContainerResources{
&v1.ContainerResources{
Name: "test-cnt-0",
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/resource",
DeviceIds: []string{"devA"},
},
},
CpuIds: []int64{0, 1},
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should have values", func() {
So(len(res), ShouldBeGreaterThan, 0)
expected := []PodResources{
PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []ContainerResources{
ContainerResources{
Name: "test-cnt-0",
Resources: []ResourceInfo{
ResourceInfo{
Name: "cpu",
Data: []string{"0", "1"},
},
ResourceInfo{
Name: "fake.io/resource",
Data: []string{"devA"},
},
},
},
},
},
}
So(reflect.DeepEqual(res, expected), ShouldBeTrue)
})
})
Convey("When I successfully get valid response without devices", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
&v1.PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []*v1.ContainerResources{
&v1.ContainerResources{
Name: "test-cnt-0",
CpuIds: []int64{0, 1},
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should have values", func() {
So(len(res), ShouldBeGreaterThan, 0)
expected := []PodResources{
PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []ContainerResources{
ContainerResources{
Name: "test-cnt-0",
Resources: []ResourceInfo{
ResourceInfo{
Name: "cpu",
Data: []string{"0", "1"},
},
},
},
},
},
}
So(reflect.DeepEqual(res, expected), ShouldBeTrue)
})
})
Convey("When I successfully get valid response without cpus", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
&v1.PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []*v1.ContainerResources{
&v1.ContainerResources{
Name: "test-cnt-0",
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/resource",
DeviceIds: []string{"devA"},
},
},
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should have values", func() {
So(len(res), ShouldBeGreaterThan, 0)
expected := []PodResources{
PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []ContainerResources{
ContainerResources{
Name: "test-cnt-0",
Resources: []ResourceInfo{
ResourceInfo{
Name: "fake.io/resource",
Data: []string{"devA"},
},
},
},
},
},
}
So(reflect.DeepEqual(res, expected), ShouldBeTrue)
})
})
Convey("When I successfully get valid response without resources", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
&v1.PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []*v1.ContainerResources{
&v1.ContainerResources{
Name: "test-cnt-0",
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should not have values", func() {
So(len(res), ShouldEqual, 0)
})
})
})
Convey("When I scan for pod resources using fake client and given namespace", t, func() {
mockPodResClient := new(podres.MockPodResourcesListerClient)
resScan, err = NewPodResourcesScanner("pod-res-test", mockPodResClient)
Convey("Creating a Resources Scanner using a mock client", func() {
So(err, ShouldBeNil)
})
Convey("When I get error", func() {
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(nil, fmt.Errorf("fake error"))
res, err := resScan.Scan()
Convey("Error is present", func() {
So(err, ShouldNotBeNil)
})
Convey("Return PodResources should be nil", func() {
So(res, ShouldBeNil)
})
})
Convey("When I successfully get empty response", func() {
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(&v1.ListPodResourcesResponse{}, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should be zero", func() {
So(len(res), ShouldEqual, 0)
})
})
Convey("When I successfully get valid response", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
&v1.PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []*v1.ContainerResources{
&v1.ContainerResources{
Name: "test-cnt-0",
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/resource",
DeviceIds: []string{"devA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{ID: 0},
},
},
},
},
CpuIds: []int64{0, 1},
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should be zero", func() {
So(len(res), ShouldEqual, 0)
})
})
})
}

View file

@ -0,0 +1,62 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"time"
corev1 "k8s.io/api/core/v1"
topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
)
// Args stores commandline arguments used for resource monitoring
type Args struct {
PodResourceSocketPath string
SleepInterval time.Duration
Namespace string
KubeletConfigFile string
}
// ResourceInfo stores information of resources and their corresponding IDs obtained from PodResource API
type ResourceInfo struct {
Name corev1.ResourceName
Data []string
}
// ContainerResources contains information about the node resources assigned to a container
type ContainerResources struct {
Name string
Resources []ResourceInfo
}
// PodResources contains information about the node resources assigned to a pod
type PodResources struct {
Name string
Namespace string
Containers []ContainerResources
}
// ResourcesScanner gathers all the PodResources from the system, using the podresources API client
type ResourcesScanner interface {
Scan() ([]PodResources, error)
}
// ResourceAggregator aggregates resource information based on the received data from underlying hardware and podresource API
type ResourcesAggregator interface {
Aggregate(podResData []PodResources) topologyv1alpha1.ZoneList
}

View file

@ -0,0 +1,46 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologypolicy
import (
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/apis/config"
)
// DetectTopologyPolicy returns string type which present
// both Topology manager policy and scope
func DetectTopologyPolicy(policy string, scope string) v1alpha1.TopologyManagerPolicy {
switch policy {
case config.SingleNumaNodeTopologyManagerPolicy:
if scope == config.PodTopologyManagerScope {
return v1alpha1.SingleNUMANodePodLevel
} else if scope == config.ContainerTopologyManagerScope {
return v1alpha1.SingleNUMANodeContainerLevel
} else {
// default scope for single-numa-node
return v1alpha1.SingleNUMANodeContainerLevel
}
case config.RestrictedTopologyManagerPolicy:
return v1alpha1.Restricted
case config.BestEffortTopologyManagerPolicy:
return v1alpha1.BestEffort
case config.NoneTopologyManagerPolicy:
return v1alpha1.None
default:
return v1alpha1.None
}
}

View file

@ -30,6 +30,8 @@ var (
SysfsDir = HostDir(pathPrefix + "sys")
// UsrPath is where the /usr directory of the system to be inspected is located
UsrDir = HostDir(pathPrefix + "usr")
// VarPath is where the /var directory of the system to be inspected is located
VarDir = HostDir(pathPrefix + "var")
)
// HostDir is a helper for handling host system directories

View file

@ -0,0 +1,42 @@
apiVersion: kubelet.config.k8s.io/v1beta1
authentication:
anonymous:
enabled: false
webhook:
cacheTTL: 0s
enabled: true
x509:
clientCAFile: /etc/kubernetes/pki/ca.crt
authorization:
mode: Webhook
webhook:
cacheAuthorizedTTL: 0s
cacheUnauthorizedTTL: 0s
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
cpuManagerPolicy: static
cpuManagerReconcilePeriod: 0s
evictionHard:
imagefs.available: 0%
nodefs.available: 0%
nodefs.inodesFree: 0%
evictionPressureTransitionPeriod: 0s
fileCheckFrequency: 0s
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 0s
imageGCHighThresholdPercent: 100
imageMinimumGCAge: 0s
kind: KubeletConfiguration
logging: {}
nodeStatusReportFrequency: 0s
nodeStatusUpdateFrequency: 0s
reservedSystemCPUs: 1,3
rotateCertificates: true
runtimeRequestTimeout: 0s
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 0s
syncFrequency: 0s
topologyManagerPolicy: single-numa-node
volumeStatsAggPeriod: 0s

View file

@ -0,0 +1,12 @@
-----BEGIN CERTIFICATE-----
MIIBwzCCAW2gAwIBAgIJAMRplUIVEGN7MA0GCSqGSIb3DQEBCwUAMFsxCzAJBgNV
BAYTAkZJMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxFDASBgNVBAMMC25mZC10ZXN0LWNhMB4XDTIwMTExODEz
NTIxOFoXDTMwMTExNjEzNTIxOFowDTELMAkGA1UEBhMCRkkwgZ8wDQYJKoZIhvcN
AQEBBQADgY0AMIGJAoGBAOX+AzRK17SFuhgSeWrf+B8SZUEdBhZGjBK6ypmAZgMW
6JWwTbsdJRU6uorTX2XvgOtRolXbwSTwWjFpkvYgK0Eo8kKLU5tOE6XYi04UVYRv
Ha26pQKpfseUug641GOtUlWYUgGmvSMikEIilh+b/aAG+5/K+fNqi08fLmX6t2XL
AgMBAAGjHjAcMBoGA1UdEQQTMBGCD25mZC10ZXN0LXdvcmtlcjANBgkqhkiG9w0B
AQsFAANBAGuSwyPjduIQz4n7L+tRPs3+XPJ8fvbzC0pADYA0geow/m+X784If8nT
Pj+8quQn9zPsaQ+1bNahobTlHRmQcPE=
-----END CERTIFICATE-----

View file

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAOX+AzRK17SFuhgS
eWrf+B8SZUEdBhZGjBK6ypmAZgMW6JWwTbsdJRU6uorTX2XvgOtRolXbwSTwWjFp
kvYgK0Eo8kKLU5tOE6XYi04UVYRvHa26pQKpfseUug641GOtUlWYUgGmvSMikEIi
lh+b/aAG+5/K+fNqi08fLmX6t2XLAgMBAAECgYEAqsA7gMdP/iaKUvTkUASYIfl2
UzFJI6CcvgsP/4bkNcb8RqXuD81Dis9fT1I+sV9vR0YET9onO1V2oNjQ0wpvETjO
bk5veRfqFLOTavl64pAPGLEvOTWHSHQ9rtFZst1FFfehB1Vw69nBs9E40Zo2Y9yv
gkK+RIKUc2oPqMOigQECQQD8k2jxRX1COF+GO+pBTOTAr3pAmd0ahWAQGoqLwteQ
x+ARRZss1nuX0IGEyJ89hD6dHLv/FhhUxGE1R0xdQ31JAkEA6Rw5VYrAphATPCCX
h2gboAbHTOFAzwjnlW1pU6nlZI89kDAD3TF8d+eq906J8y7ji0YE89/G4HEzHqtQ
vMsucwJBAId2VAlauJRkga8PwVKmd+Vz98BgBTqtH9ljMr1EkbK/0EfTKieBHSZO
GLjrlKQ8ogxHlfh4lDIaZPxbMfSvNqkCQDkEfEmeHK0BtZK5bhbisg8cWVdGqXF6
fhqgnmimX8OO/cHs3KUX25gAhGLlRPzEdUe1orR8AcsYJSbVRHRJRl0CQQC7VBgp
04kaZzLFf61TmqMGVDoG2Wi5HwXYyzAEEEYFW61kwfZ6vuq3AP7NPMfW1F94welg
8LfkI2NBgjyKGiqn
-----END PRIVATE KEY-----