1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

Merge pull request #525 from k8stopologyawareschedwg/topology-updater-implementation

Introducing NFD Topology Updater exposing Resource hardware Topology info through CRs
This commit is contained in:
Kubernetes Prow Robot 2021-09-21 03:04:23 -07:00 committed by GitHub
commit 16139ae1f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
50 changed files with 4199 additions and 29 deletions

View file

@ -102,9 +102,11 @@ mock:
mockery --name=LabelSource --dir=source --inpkg --note="Re-generate by running 'make mock'"
mockery --name=APIHelpers --dir=pkg/apihelper --inpkg --note="Re-generate by running 'make mock'"
mockery --name=LabelerClient --dir=pkg/labeler --inpkg --note="Re-generate by running 'make mock'"
mockery --name=NodeTopologyClient --dir=pkg/topologyupdater --inpkg --note="Re-generate by running 'make mock'"
apigen:
protoc --go_opt=paths=source_relative --go_out=plugins=grpc:. pkg/labeler/labeler.proto
protoc --go_opt=paths=source_relative --go_out=plugins=grpc:. pkg/topologyupdater/topology-updater.proto
gofmt:
@$(GO_FMT) -w -l $$(find . -name '*.go')

View file

@ -105,6 +105,9 @@ func initFlags(flagset *flag.FlagSet) *master.Args {
flagset.BoolVar(&args.VerifyNodeName, "verify-node-name", false,
"Verify worker node name against the worker's TLS certificate. "+
"Only takes effect when TLS authentication has been enabled.")
flagset.StringVar(&args.NRTNamespace, "nrt-namespace", "default",
"Namespace in which Node Resource Topology CR are created"+
"Ensure that the namespace specified is already exists.")
return args
}

View file

@ -0,0 +1,130 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"os"
"time"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/kubeconf"
topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/node-feature-discovery/source"
)
const (
// ProgramName is the canonical name of this program
ProgramName = "nfd-topology-updater"
)
func main() {
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)
printVersion := flags.Bool("version", false, "Print version and exit.")
args, resourcemonitorArgs := parseArgs(flags, os.Args[1:]...)
if *printVersion {
fmt.Println(ProgramName, version.Get())
os.Exit(0)
}
// Assert that the version is known
if version.Undefined() {
klog.Warningf("version not set! Set -ldflags \"-X sigs.k8s.io/node-feature-discovery/pkg/version.version=`git describe --tags --dirty --always`\" during build or run.")
}
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()
klConfig, err := kubeconf.GetKubeletConfigFromLocalFile(resourcemonitorArgs.KubeletConfigFile)
if err != nil {
klog.Fatalf("error reading kubelet config: %v", err)
}
tmPolicy := string(topologypolicy.DetectTopologyPolicy(klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope))
klog.Infof("detected kubelet Topology Manager policy %q", tmPolicy)
// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, tmPolicy)
if err != nil {
klog.Exitf("failed to initialize TopologyUpdater instance: %v", err)
}
if err = instance.Run(); err != nil {
klog.Exit(err)
}
}
func parseArgs(flags *flag.FlagSet, osArgs ...string) (*topology.Args, *resourcemonitor.Args) {
args, resourcemonitorArgs := initFlags(flags)
_ = flags.Parse(osArgs)
if len(flags.Args()) > 0 {
fmt.Fprintf(flags.Output(), "unknown command line argument: %s\n", flags.Args()[0])
flags.Usage()
os.Exit(2)
}
return args, resourcemonitorArgs
}
func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
args := &topology.Args{}
resourcemonitorArgs := &resourcemonitor.Args{}
flagset.StringVar(&args.CaFile, "ca-file", "",
"Root certificate for verifying connections")
flagset.StringVar(&args.CertFile, "cert-file", "",
"Certificate used for authenticating connections")
flagset.StringVar(&args.KeyFile, "key-file", "",
"Private key matching -cert-file")
flagset.BoolVar(&args.Oneshot, "oneshot", false,
"Update once and exit")
flagset.BoolVar(&args.NoPublish, "no-publish", false,
"Do not publish discovered features to the cluster-local Kubernetes API server.")
flagset.StringVar(&args.KubeConfigFile, "kubeconfig", "",
"Kube config file.")
flagset.DurationVar(&resourcemonitorArgs.SleepInterval, "sleep-interval", time.Duration(60)*time.Second,
"Time to sleep between CR updates. Non-positive value implies no CR updatation (i.e. infinite sleep). [Default: 60s]")
flagset.StringVar(&resourcemonitorArgs.Namespace, "watch-namespace", "*",
"Namespace to watch pods (for testing/debugging purpose). Use * for all namespaces.")
flagset.StringVar(&resourcemonitorArgs.KubeletConfigFile, "kubelet-config-file", source.VarDir.Path("lib/kubelet/config.yaml"),
"Kubelet config file path.")
flagset.StringVar(&resourcemonitorArgs.PodResourceSocketPath, "podresources-socket", source.VarDir.Path("lib/kubelet/pod-resources/kubelet.sock"),
"Pod Resource Socket path to use.")
flagset.StringVar(&args.Server, "server", "localhost:8080",
"NFD server address to connecto to.")
flagset.StringVar(&args.ServerNameOverride, "server-name-override", "",
"Hostname expected from server certificate, useful in testing")
initKlogFlags(flagset)
return args, resourcemonitorArgs
}
func initKlogFlags(flagset *flag.FlagSet) {
flags := flag.NewFlagSet("klog flags", flag.ExitOnError)
//flags.SetOutput(ioutil.Discard)
klog.InitFlags(flags)
}

View file

@ -0,0 +1,104 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestArgsParse(t *testing.T) {
Convey("When parsing command line arguments", t, func() {
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)
Convey("When --no-publish and --oneshot flags are passed", func() {
args, finderArgs := parseArgs(flags, "--oneshot", "--no-publish")
Convey("noPublish is set and args.sources is set to the default value", func() {
So(args.NoPublish, ShouldBeTrue)
So(args.Oneshot, ShouldBeTrue)
So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Convey("When valid args are specified for --kubelet-config-file and --sleep-interval,", func() {
args, finderArgs := parseArgs(flags,
"--kubelet-config-file=/path/testconfig.yaml",
"--sleep-interval=30s")
Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse)
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/path/testconfig.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Convey("When valid args are specified for --podresources-socket flag and --sleep-interval is specified", func() {
args, finderArgs := parseArgs(flags,
"--podresources-socket=/path/testkubelet.sock",
"--sleep-interval=30s")
Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse)
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/path/testkubelet.sock")
})
})
Convey("When valid args are specified for--sysfs and --sleep-inteval is specified", func() {
args, finderArgs := parseArgs(flags,
"--sleep-interval=30s")
Convey("args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeFalse)
So(args.Oneshot, ShouldBeFalse)
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Convey("When All valid args are specified", func() {
args, finderArgs := parseArgs(flags,
"--no-publish",
"--sleep-interval=30s",
"--kubelet-config-file=/path/testconfig.yaml",
"--podresources-socket=/path/testkubelet.sock",
"--ca-file=ca",
"--cert-file=crt",
"--key-file=key")
Convey("--no-publish is set and args.sources is set to appropriate values", func() {
So(args.NoPublish, ShouldBeTrue)
So(args.CaFile, ShouldEqual, "ca")
So(args.CertFile, ShouldEqual, "crt")
So(args.KeyFile, ShouldEqual, "key")
So(finderArgs.SleepInterval, ShouldEqual, 30*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/path/testconfig.yaml")
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/path/testkubelet.sock")
})
})
})
}

View file

@ -0,0 +1,5 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- noderesourcetopologies.yaml

View file

@ -0,0 +1,144 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
api-approved.kubernetes.io: https://github.com/kubernetes/enhancements/pull/1870
controller-gen.kubebuilder.io/version: v0.6.0
creationTimestamp: null
name: noderesourcetopologies.topology.node.k8s.io
namespace: ""
spec:
group: topology.node.k8s.io
names:
kind: NodeResourceTopology
listKind: NodeResourceTopologyList
plural: noderesourcetopologies
shortNames:
- node-res-topo
singular: noderesourcetopology
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: NodeResourceTopology describes node resources and their topology.
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
topologyPolicies:
items:
type: string
type: array
zones:
description: ZoneList contains an array of Zone objects.
items:
description: Zone represents a resource topology zone, e.g. socket,
node, die or core.
properties:
attributes:
description: AttributeList contains an array of AttributeInfo objects.
items:
description: AttributeInfo contains one attribute of a Zone.
properties:
name:
type: string
value:
type: string
required:
- name
- value
type: object
type: array
costs:
description: CostList contains an array of CostInfo objects.
items:
description: CostInfo describes the cost (or distance) between
two Zones.
properties:
name:
type: string
value:
format: int64
type: integer
required:
- name
- value
type: object
type: array
name:
type: string
parent:
type: string
resources:
description: ResourceInfoList contains an array of ResourceInfo
objects.
items:
description: ResourceInfo contains information about one resource
type.
properties:
allocatable:
anyOf:
- type: integer
- type: string
description: Allocatable quantity of the resource, corresponding
to allocatable in node status, i.e. total amount of this
resource available to be used by pods.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
available:
anyOf:
- type: integer
- type: string
description: Available is the amount of this resource currently
available for new (to be scheduled) pods, i.e. Allocatable
minus the resources reserved by currently running pods.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
capacity:
anyOf:
- type: integer
- type: string
description: Capacity of the resource, corresponding to capacity
in node status, i.e. total amount of this resource that
the node has.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
name:
description: Name of the resource.
type: string
required:
- allocatable
- available
- capacity
- name
type: object
type: array
type:
type: string
required:
- name
- type
type: object
type: array
required:
- topologyPolicies
- zones
type: object
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

View file

@ -0,0 +1,9 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: node-feature-discovery
resources:
- topologyupdater-serviceaccount.yaml
- topologyupdater-clusterrole.yaml
- topologyupdater-clusterrolebinding.yaml

View file

@ -0,0 +1,18 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: nfd-topology-updater
rules:
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- apiGroups:
- ""
resources:
- pods
verbs:
- get

View file

@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: nfd-topology-updater
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: nfd-topology-updater
subjects:
- kind: ServiceAccount
name: nfd-topology-updater
namespace: default

View file

@ -0,0 +1,4 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfd-topology-updater

View file

@ -11,3 +11,12 @@ rules:
- get
- patch
- update
- list
- apiGroups:
- topology.node.k8s.io
resources:
- noderesourcetopologies
verbs:
- create
- get
- update

View file

@ -0,0 +1,7 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: node-feature-discovery
resources:
- topologyupdater-daemonset.yaml

View file

@ -0,0 +1,29 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: nfd
name: nfd-topology-updater
spec:
selector:
matchLabels:
app: nfd-topology-updater
template:
metadata:
labels:
app: nfd-topology-updater
spec:
dnsPolicy: ClusterFirstWithHostNet
serviceAccount: nfd-topology-updater
containers:
- name: nfd-topology-updater
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
imagePullPolicy: Always
command:
- "nfd-topology-updater"
args:
- "--kubelet-config-file=/host-var/lib/kubelet/config.yaml"
- "--podresources-socket=/host-var/lib/kubelet/pod-resources/kubelet.sock"
- "--sleep-interval=3s"
- "--watch-namespace=*"
- "--server=nfd-master:8080"

View file

@ -0,0 +1,7 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: node-feature-discovery
resources:
- topologyupdater-job.yaml

View file

@ -0,0 +1,39 @@
apiVersion: batch/v1
kind: Job
metadata:
labels:
app: nfd
name: nfd-topology-updater
spec:
completions: NUM_NODES
parallelism: NUM_NODES
template:
metadata:
labels:
app: nfd-topology-updater
spec:
dnsPolicy: ClusterFirstWithHostNet
serviceAccount: nfd-topology-updater
restartPolicy: Never
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: kubernetes.io/hostname
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- nfd-topology-updater
containers:
- name: nfd-topology-updater
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
imagePullPolicy: Always
command:
- "nfd-topology-updater"
args:
- "--kubelet-config-file=/host-var/lib/kubelet/config.yaml"
- "--podresources-socket=/host-var/lib/kubelet/pod-resources/kubelet.sock"
- "--sleep-interval=3s"
- "--watch-namespace=*"
- "--server=nfd-master:8080"

View file

@ -0,0 +1,12 @@
apiVersion: kustomize.config.k8s.io/v1alpha1
kind: Component
patches:
- path: topologyupdater-securitycontext.yaml
target:
labelSelector: app=nfd
name: nfd-topology-updater
- path: topologyupdater-mounts.yaml
target:
labelSelector: app=nfd
name: nfd-topology-updater

View file

@ -0,0 +1,21 @@
- op: add
path: /spec/template/spec/volumes
value:
- name: host-sys
hostPath:
path: "/sys"
- name: kubelet-podresources-conf
hostPath:
path: /var/lib/kubelet/config.yaml
- name: kubelet-podresources-sock
hostPath:
path: /var/lib/kubelet/pod-resources/kubelet.sock
- op: add
path: /spec/template/spec/containers/0/volumeMounts
value:
- name: kubelet-podresources-conf
mountPath: /host-var/lib/kubelet/config.yaml
- name: kubelet-podresources-sock
mountPath: /host-var/lib/kubelet/pod-resources/kubelet.sock
- name: host-sys
mountPath: /host-sys

View file

@ -0,0 +1,8 @@
- op: add
path: "/spec/template/spec/containers/0/securityContext"
value:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
readOnlyRootFilesystem: true
runAsUser: 0

View file

@ -0,0 +1,20 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: node-feature-discovery
bases:
- ../../base/rbac
- ../../base/rbac-topologyupdater
- ../../base/master
- ../../base/worker-daemonset
- ../../base/noderesourcetopologies-crd
- ../../base/topologyupdater-daemonset
resources:
- namespace.yaml
components:
- ../../components/worker-config
- ../../components/common
- ../../components/topology-updater

View file

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: node-feature-discovery

View file

@ -0,0 +1,18 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: node-feature-discovery
bases:
- ../../base/rbac
- ../../base/rbac-topologyupdater
- ../../base/master
- ../../base/noderesourcetopologies-crd
- ../../base/topologyupdater-job
resources:
- namespace.yaml
components:
- ../../components/common
- ../../components/topology-updater

View file

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: node-feature-discovery

View file

@ -0,0 +1,16 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: node-feature-discovery
bases:
- ../../base/rbac-topologyupdater
- ../../base/worker-daemonset
- ../../base/noderesourcetopologies-crd
- ../../base/topologyupdater-daemonset
resources:
- namespace.yaml
components:
- ../../components/topology-updater

View file

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: node-feature-discovery

7
go.mod
View file

@ -4,6 +4,11 @@ go 1.16
require (
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.5
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.10
github.com/klauspost/cpuid/v2 v2.0.9
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
@ -12,12 +17,14 @@ require (
github.com/stretchr/testify v1.7.0
github.com/vektra/errors v0.0.0-20140903201135-c64d83aba85a
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.27.1
k8s.io/api v0.22.0
k8s.io/apimachinery v0.22.0
k8s.io/client-go v0.22.0
k8s.io/klog/v2 v2.9.0
k8s.io/kubelet v0.0.0
k8s.io/kubernetes v1.22.0
sigs.k8s.io/yaml v1.2.0
)

20
go.sum
View file

@ -72,6 +72,8 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -229,6 +231,7 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
@ -245,6 +248,8 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0 h1:K7/B1jt6fIBQVd4Owv2MqGQClcgf0R266+7C/QjRcLc=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@ -385,6 +390,11 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS+hD/kDi1U1QsX2fnZowwBhqD0Dk3bRPKF/Oc8=
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e h1:XTXPzmyiwx2uxk8JaU4mxmBZ+rzZtmEwkNm9H9ETzV0=
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e/go.mod h1:+gR9bjm3W/HnFi90liF+Fj9GpCe/Dsibl9Im8KmC7c4=
github.com/jaypipes/pcidb v0.6.0 h1:VIM7GKVaW4qba30cvB67xSCgJPTzkG8Kzw/cbs5PHWU=
github.com/jaypipes/pcidb v0.6.0/go.mod h1:L2RGk04sfRhp5wvHO0gfRAMoLY/F3PKv/nwJeVoho0o=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
@ -406,6 +416,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.10 h1:wHS+TOQfFY67wkS1roZ5WVyihnE/IQmVsD0zzKtzHrU=
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.10/go.mod h1:yJo22okt35DQhvNw3Hgpaol6/oryET8Y5n1CJb9R5mM=
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@ -452,6 +464,7 @@ github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible h1
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9Gns0u4=
@ -601,11 +614,13 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
@ -876,8 +891,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
@ -1072,6 +1088,8 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M=
howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0=
k8s.io/api v0.22.0 h1:elCpMZ9UE8dLdYxr55E06TmSeji9I3KH494qH70/y+c=
k8s.io/api v0.22.0/go.mod h1:0AoXXqst47OI/L0oGKq9DG61dvGRPXs7X4/B7KyjBCU=
k8s.io/apiextensions-apiserver v0.22.0 h1:QTuZIQggaE7N8FTjur+1zxLmEPziphK7nNm8t+VNO3g=

View file

@ -17,6 +17,7 @@ limitations under the License.
package apihelper
import (
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
api "k8s.io/api/core/v1"
k8sclient "k8s.io/client-go/kubernetes"
)
@ -40,4 +41,10 @@ type APIHelpers interface {
// PatchNodeStatus updates the node status via the API server using a client.
PatchNodeStatus(*k8sclient.Clientset, string, []JsonPatch) error
// GetTopologyClient returns a topologyclientset
GetTopologyClient() (*topologyclientset.Clientset, error)
// GetPod returns the Kubernetes pod in a namepace with a name.
GetPod(*k8sclient.Clientset, string, string) (*api.Pod, error)
}

View file

@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
api "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -54,6 +55,27 @@ func (h K8sHelpers) GetClient() (*k8sclient.Clientset, error) {
return clientset, nil
}
func (h K8sHelpers) GetTopologyClient() (*topologyclientset.Clientset, error) {
// Set up an in-cluster K8S client.
var config *restclient.Config
var err error
if h.Kubeconfig == "" {
config, err = restclient.InClusterConfig()
} else {
config, err = clientcmd.BuildConfigFromFlags("", h.Kubeconfig)
}
if err != nil {
return nil, err
}
topologyClient, err := topologyclientset.NewForConfig(config)
if err != nil {
return nil, err
}
return topologyClient, nil
}
func (h K8sHelpers) GetNode(cli *k8sclient.Clientset, nodeName string) (*api.Node, error) {
// Get the node object using node name
node, err := cli.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{})
@ -100,3 +122,13 @@ func (h K8sHelpers) PatchNodeStatus(c *k8sclient.Clientset, nodeName string, pat
return nil
}
func (h K8sHelpers) GetPod(cli *k8sclient.Clientset, namespace string, podName string) (*api.Pod, error) {
// Get the node object using pod name
pod, err := cli.CoreV1().Pods(namespace).Get(context.TODO(), podName, meta_v1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}

View file

@ -9,6 +9,8 @@ import (
kubernetes "k8s.io/client-go/kubernetes"
v1 "k8s.io/api/core/v1"
versioned "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
)
// MockAPIHelpers is an autogenerated mock type for the APIHelpers type
@ -85,6 +87,52 @@ func (_m *MockAPIHelpers) GetNodes(_a0 *kubernetes.Clientset) (*v1.NodeList, err
return r0, r1
}
// GetPod provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockAPIHelpers) GetPod(_a0 *kubernetes.Clientset, _a1 string, _a2 string) (*v1.Pod, error) {
ret := _m.Called(_a0, _a1, _a2)
var r0 *v1.Pod
if rf, ok := ret.Get(0).(func(*kubernetes.Clientset, string, string) *v1.Pod); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.Pod)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*kubernetes.Clientset, string, string) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetTopologyClient provides a mock function with given fields:
func (_m *MockAPIHelpers) GetTopologyClient() (*versioned.Clientset, error) {
ret := _m.Called()
var r0 *versioned.Clientset
if rf, ok := ret.Get(0).(func() *versioned.Clientset); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*versioned.Clientset)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PatchNode provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockAPIHelpers) PatchNode(_a0 *kubernetes.Clientset, _a1 string, _a2 []JsonPatch) error {
ret := _m.Called(_a0, _a1, _a2)

View file

@ -0,0 +1,39 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeconf
import (
"io/ioutil"
"github.com/ghodss/yaml"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
)
// GetKubeletConfigFromLocalFile returns KubeletConfiguration loaded from the node local config
func GetKubeletConfigFromLocalFile(kubeletConfigPath string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
kubeletBytes, err := ioutil.ReadFile(kubeletConfigPath)
if err != nil {
return nil, err
}
kubeletConfig := &kubeletconfigv1beta1.KubeletConfiguration{}
if err := yaml.Unmarshal(kubeletBytes, kubeletConfig); err != nil {
return nil, err
}
return kubeletConfig, nil
}

View file

@ -0,0 +1,46 @@
/*
Copyright 2019-2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeconf
import (
"path/filepath"
"testing"
)
type testCaseData struct {
path string
tmPolicy string
}
func TestGetKubeletConfigFromLocalFile(t *testing.T) {
tCases := []testCaseData{
{
path: filepath.Join("..", "..", "test", "data", "kubeletconf.yaml"),
tmPolicy: "single-numa-node",
},
}
for _, tCase := range tCases {
cfg, err := GetKubeletConfigFromLocalFile(tCase.path)
if err != nil {
t.Errorf("failed to read config from %q: %v", tCase.path, err)
}
if cfg.TopologyManagerPolicy != tCase.tmPolicy {
t.Errorf("TM policy mismatch, found %q expected %q", cfg.TopologyManagerPolicy, tCase.tmPolicy)
}
}
}

View file

@ -55,7 +55,11 @@ type Args struct {
Klog map[string]*utils.KlogFlagVal
}
var nodeName = os.Getenv("NODE_NAME")
var nodeName string
func init() {
nodeName = os.Getenv("NODE_NAME")
}
// NodeName returns the name of the k8s node we're running on.
func NodeName() string { return nodeName }

View file

@ -0,0 +1,246 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologyupdater
import (
"fmt"
"time"
"k8s.io/klog/v2"
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"golang.org/x/net/context"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/podres"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
pb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
// Command line arguments
type Args struct {
nfdclient.Args
NoPublish bool
Oneshot bool
KubeConfigFile string
}
type NfdTopologyUpdater interface {
nfdclient.NfdClient
Update(v1alpha1.ZoneList) error
}
type staticNodeInfo struct {
tmPolicy string
}
type nfdTopologyUpdater struct {
nfdclient.NfdBaseClient
nodeInfo *staticNodeInfo
args Args
resourcemonitorArgs resourcemonitor.Args
certWatch *utils.FsWatcher
client pb.NodeTopologyClient
stop chan struct{} // channel for signaling stop
}
// Create new NewTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy string) (NfdTopologyUpdater, error) {
base, err := nfdclient.NewNfdBaseClient(&args.Args)
if err != nil {
return nil, err
}
nfd := &nfdTopologyUpdater{
NfdBaseClient: base,
args: args,
resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: &staticNodeInfo{
tmPolicy: policy,
},
stop: make(chan struct{}, 1),
}
return nfd, nil
}
// Run nfdTopologyUpdater client. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error {
klog.Infof("Node Feature Discovery Topology Updater %s", version.Get())
klog.Infof("NodeName: '%s'", nfdclient.NodeName())
podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil {
klog.Fatalf("failed to get PodResource Client: %w", err)
return err
}
kubeApihelper := apihelper.K8sHelpers{Kubeconfig: w.args.KubeConfigFile}
var resScan resourcemonitor.ResourcesScanner
resScan, err = resourcemonitor.NewPodResourcesScanner(w.resourcemonitorArgs.Namespace, podResClient, kubeApihelper)
if err != nil {
klog.Fatalf("failed to initialize ResourceMonitor instance: %w", err)
return err
}
// CAUTION: these resources are expected to change rarely - if ever.
// So we are intentionally do this once during the process lifecycle.
// TODO: Obtain node resources dynamically from the podresource API
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha1.ZoneList
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient)
if err != nil {
klog.Fatalf("failed to obtain node resource information: %w", err)
return err
}
klog.V(2).Infof("resAggr is: %v\n", resAggr)
// Create watcher for TLS certificates
w.certWatch, err = utils.CreateFsWatcher(time.Second, w.args.CaFile, w.args.CertFile, w.args.KeyFile)
if err != nil {
return err
}
crTrigger := time.After(0)
for {
select {
case <-crTrigger:
klog.Infof("Scanning\n")
podResources, err := resScan.Scan()
utils.KlogDump(1, "podResources are", " ", podResources)
if err != nil {
klog.Warningf("Scan failed: %v\n", err)
continue
}
zones = resAggr.Aggregate(podResources)
utils.KlogDump(1, "After aggregating resources identified zones are", " ", zones)
if err = w.Update(zones); err != nil {
return err
}
if w.args.Oneshot {
return nil
}
if w.resourcemonitorArgs.SleepInterval > 0 {
crTrigger = time.After(w.resourcemonitorArgs.SleepInterval)
}
case <-w.certWatch.Events:
klog.Infof("TLS certificate update, renewing connection to nfd-master")
w.Disconnect()
if err := w.Connect(); err != nil {
return err
}
case <-w.stop:
klog.Infof("shutting down nfd-topology-updater")
w.certWatch.Close()
return nil
}
}
}
func (w *nfdTopologyUpdater) Update(zones v1alpha1.ZoneList) error {
// Connect to NFD master
err := w.Connect()
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
defer w.Disconnect()
if w.client == nil {
return nil
}
err = advertiseNodeTopology(w.client, zones, w.nodeInfo.tmPolicy, nfdclient.NodeName())
if err != nil {
return fmt.Errorf("failed to advertise node topology: %w", err)
}
return nil
}
// Stop NFD Topology Updater
func (w *nfdTopologyUpdater) Stop() {
select {
case w.stop <- struct{}{}:
default:
}
}
// connect creates a client connection to the NFD master
func (w *nfdTopologyUpdater) Connect() error {
// Return a dummy connection in case of dry-run
if w.args.NoPublish {
return nil
}
if err := w.NfdBaseClient.Connect(); err != nil {
return err
}
w.client = pb.NewNodeTopologyClient(w.ClientConn())
return nil
}
// disconnect closes the connection to NFD master
func (w *nfdTopologyUpdater) Disconnect() {
w.NfdBaseClient.Disconnect()
w.client = nil
}
// advertiseNodeTopology advertises the topology CR to a Kubernetes node
// via the NFD server.
func advertiseNodeTopology(client pb.NodeTopologyClient, zoneInfo v1alpha1.ZoneList, tmPolicy string, nodeName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
zones := make([]*v1alpha1.Zone, len(zoneInfo))
// TODO: Avoid copying of data to allow returning the zone info
// directly in a compatible data type (i.e. []*v1alpha1.Zone).
for i, zone := range zoneInfo {
zones[i] = &v1alpha1.Zone{
Name: zone.Name,
Type: zone.Type,
Parent: zone.Parent,
Resources: zone.Resources,
Costs: zone.Costs,
}
}
topologyReq := &pb.NodeTopologyRequest{
Zones: zones,
NfdVersion: version.Get(),
NodeName: nodeName,
TopologyPolicies: []string{tmPolicy},
}
utils.KlogDump(1, "Sending NodeTopologyRequest to nfd-master:", " ", topologyReq)
_, err := client.UpdateNodeTopology(ctx, topologyReq)
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,169 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologyupdater_test
import (
"fmt"
"os"
"testing"
"time"
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
. "github.com/smartystreets/goconvey/convey"
"k8s.io/apimachinery/pkg/api/resource"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
u "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/topology-updater"
nfdmaster "sigs.k8s.io/node-feature-discovery/pkg/nfd-master"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/test/data"
)
type testContext struct {
master nfdmaster.NfdMaster
errs chan error
}
func setupTest(args *nfdmaster.Args) testContext {
// Fixed port and no-publish, for convenience
args.NoPublish = true
args.Port = 8192
m, err := nfdmaster.NewNfdMaster(args)
if err != nil {
fmt.Printf("Test setup failed: %v\n", err)
os.Exit(1)
}
ctx := testContext{master: m, errs: make(chan error)}
// Run nfd-master instance, intended to be used as the server counterpart
go func() {
ctx.errs <- ctx.master.Run()
close(ctx.errs)
}()
ready := ctx.master.WaitForReady(time.Second)
if !ready {
fmt.Println("Test setup failed: timeout while waiting for nfd-master")
os.Exit(1)
}
return ctx
}
func teardownTest(ctx testContext) {
ctx.master.Stop()
for e := range ctx.errs {
if e != nil {
fmt.Printf("Error in test context: %v\n", e)
os.Exit(1)
}
}
}
func TestNewTopologyUpdater(t *testing.T) {
Convey("When initializing new NfdTopologyUpdater instance", t, func() {
Convey("When one of --cert-file, --key-file or --ca-file is missing", func() {
tmPolicy := "fake-topology-manager-policy"
_, err := u.NewTopologyUpdater(u.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}}, resourcemonitor.Args{}, tmPolicy)
_, err2 := u.NewTopologyUpdater(u.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}}, resourcemonitor.Args{}, tmPolicy)
_, err3 := u.NewTopologyUpdater(u.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}}, resourcemonitor.Args{}, tmPolicy)
Convey("An error should be returned", func() {
So(err, ShouldNotBeNil)
So(err2, ShouldNotBeNil)
So(err3, ShouldNotBeNil)
})
})
})
}
func TestUpdate(t *testing.T) {
ctx := setupTest(&nfdmaster.Args{})
resourceInfo := v1alpha1.ResourceInfoList{
v1alpha1.ResourceInfo{
Name: "cpu",
Available: resource.MustParse("2"),
Allocatable: resource.MustParse("4"),
Capacity: resource.MustParse("4"),
},
}
zones := v1alpha1.ZoneList{
v1alpha1.Zone{
Name: "node-0",
Type: "Node",
Resources: resourceInfo,
},
}
defer teardownTest(ctx)
Convey("When running nfd-topology-updater against nfd-master", t, func() {
Convey("When running as a Oneshot job with Zones", func() {
args := u.Args{
Oneshot: true,
Args: nfdclient.Args{
Server: "localhost:8192"},
}
updater, _ := u.NewTopologyUpdater(args, resourcemonitor.Args{}, "fake-topology-manager-policy")
err := updater.Update(zones)
Convey("No error should be returned", func() {
So(err, ShouldBeNil)
})
})
})
}
func TestRunTls(t *testing.T) {
masterArgs := &nfdmaster.Args{
CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-master.crt"),
KeyFile: data.FilePath("nfd-test-master.key"),
VerifyNodeName: false,
}
ctx := setupTest(masterArgs)
defer teardownTest(ctx)
Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() {
Convey("When publishing CRs obtained from Zones", func() {
resourceInfo := v1alpha1.ResourceInfoList{
v1alpha1.ResourceInfo{
Name: "cpu",
Available: resource.MustParse("2"),
Allocatable: resource.MustParse("4"),
Capacity: resource.MustParse("4"),
},
}
zones := v1alpha1.ZoneList{
v1alpha1.Zone{
Name: "node-0",
Type: "Node",
Resources: resourceInfo,
},
}
updaterArgs := u.Args{
Args: nfdclient.Args{
CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-topology-updater.crt"),
KeyFile: data.FilePath("nfd-test-topology-updater.key"),
Server: "localhost:8192",
ServerNameOverride: "nfd-test-master",
},
Oneshot: true,
}
updater, _ := u.NewTopologyUpdater(updaterArgs, resourcemonitor.Args{}, "fake-topology-manager-policy")
err := updater.Update(zones)
Convey("No error should be returned", func() {
So(err, ShouldBeNil)
})
})
})
}

View file

@ -29,17 +29,21 @@ import (
"strings"
"time"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
api "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/peer"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
pb "sigs.k8s.io/node-feature-discovery/pkg/labeler"
topologypb "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
@ -90,6 +94,7 @@ type Args struct {
Prune bool
VerifyNodeName bool
ResourceLabels utils.StringSetVal
NRTNamespace string
}
type NfdMaster interface {
@ -195,6 +200,7 @@ func (m *nfdMaster) Run() error {
m.server = grpc.NewServer(serverOpts...)
pb.RegisterLabelerServer(m.server, m)
grpc_health_v1.RegisterHealthServer(m.server, health.NewServer())
topologypb.RegisterNodeTopologyServer(m.server, m)
klog.Infof("gRPC server serving on port: %d", m.args.Port)
// Run gRPC server
@ -375,29 +381,9 @@ func verifyNodeName(cert *x509.Certificate, nodeName string) error {
// SetLabels implements LabelerServer
func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.SetLabelsReply, error) {
if m.args.VerifyNodeName {
// Client authorization.
// Check that the node name matches the CN from the TLS cert
client, ok := peer.FromContext(c)
if !ok {
klog.Errorf("gRPC request error: failed to get peer (client)")
return &pb.SetLabelsReply{}, fmt.Errorf("failed to get peer (client)")
}
tlsAuth, ok := client.AuthInfo.(credentials.TLSInfo)
if !ok {
klog.Errorf("gRPC request error: incorrect client credentials from '%v'", client.Addr)
return &pb.SetLabelsReply{}, fmt.Errorf("incorrect client credentials")
}
if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
klog.Errorf("gRPC request error: client certificate verification for '%v' failed", client.Addr)
return &pb.SetLabelsReply{}, fmt.Errorf("client certificate verification failed")
}
err := verifyNodeName(tlsAuth.State.VerifiedChains[0][0], r.NodeName)
if err != nil {
klog.Errorf("gRPC request error: authorization for %v failed: %v", client.Addr, err)
return &pb.SetLabelsReply{}, err
}
err := authorizeClient(c, m.args.VerifyNodeName, r.NodeName)
if err != nil {
return &pb.SetLabelsReply{}, err
}
if klog.V(1).Enabled() {
klog.Infof("REQUEST Node: %q NFD-version: %q Labels: %s", r.NodeName, r.NfdVersion, r.Labels)
@ -421,6 +407,55 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
return &pb.SetLabelsReply{}, nil
}
func authorizeClient(c context.Context, checkNodeName bool, nodeName string) error {
if checkNodeName {
// Client authorization.
// Check that the node name matches the CN from the TLS cert
client, ok := peer.FromContext(c)
if !ok {
klog.Errorf("gRPC request error: failed to get peer (client)")
return fmt.Errorf("failed to get peer (client)")
}
tlsAuth, ok := client.AuthInfo.(credentials.TLSInfo)
if !ok {
klog.Errorf("gRPC request error: incorrect client credentials from '%v'", client.Addr)
return fmt.Errorf("incorrect client credentials")
}
if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
klog.Errorf("gRPC request error: client certificate verification for '%v' failed", client.Addr)
return fmt.Errorf("client certificate verification failed")
}
err := verifyNodeName(tlsAuth.State.VerifiedChains[0][0], nodeName)
if err != nil {
klog.Errorf("gRPC request error: authorization for %v failed: %v", client.Addr, err)
return err
}
}
return nil
}
func (m *nfdMaster) UpdateNodeTopology(c context.Context, r *topologypb.NodeTopologyRequest) (*topologypb.NodeTopologyResponse, error) {
err := authorizeClient(c, m.args.VerifyNodeName, r.NodeName)
if err != nil {
return &topologypb.NodeTopologyResponse{}, err
}
if klog.V(1).Enabled() {
klog.Infof("REQUEST Node: %s NFD-version: %s Topology Policy: %s", r.NodeName, r.NfdVersion, r.TopologyPolicies)
utils.KlogDump(1, "Zones received:", " ", r.Zones)
} else {
klog.Infof("received CR updation request for node %q", r.NodeName)
}
if !m.args.NoPublish {
err := m.updateCR(r.NodeName, r.TopologyPolicies, r.Zones, m.args.NRTNamespace)
if err != nil {
klog.Errorf("failed to advertise NodeResourceTopology: %w", err)
return &topologypb.NodeTopologyResponse{}, err
}
}
return &topologypb.NodeTopologyResponse{}, nil
}
// updateNodeFeatures ensures the Kubernetes node object is up to date,
// creating new labels and extended resources where necessary and removing
// outdated ones. Also updates the corresponding annotations.
@ -590,3 +625,57 @@ func stringToNsNames(cslist, ns string) []string {
}
return names
}
func modifyCR(topoUpdaterZones []*v1alpha1.Zone) []v1alpha1.Zone {
zones := make([]v1alpha1.Zone, len(topoUpdaterZones))
// TODO: Avoid copying of data to allow returning the zone info
// directly in a compatible data type (i.e. []*v1alpha1.Zone).
for i, zone := range topoUpdaterZones {
zones[i] = v1alpha1.Zone{
Name: zone.Name,
Type: zone.Type,
Parent: zone.Parent,
Costs: zone.Costs,
Resources: zone.Resources,
}
}
return zones
}
func (m *nfdMaster) updateCR(hostname string, tmpolicy []string, topoUpdaterZones []*v1alpha1.Zone, namespace string) error {
cli, err := m.apihelper.GetTopologyClient()
if err != nil {
return err
}
zones := modifyCR(topoUpdaterZones)
nrt, err := cli.TopologyV1alpha1().NodeResourceTopologies(namespace).Get(context.TODO(), hostname, metav1.GetOptions{})
if errors.IsNotFound(err) {
nrtNew := v1alpha1.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: hostname,
},
Zones: zones,
TopologyPolicies: tmpolicy,
}
_, err := cli.TopologyV1alpha1().NodeResourceTopologies(namespace).Create(context.TODO(), &nrtNew, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create v1alpha1.NodeResourceTopology!:%w", err)
}
return nil
} else if err != nil {
return err
}
nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zones
nrtUpdated, err := cli.TopologyV1alpha1().NodeResourceTopologies(namespace).Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update v1alpha1.NodeResourceTopology!:%w", err)
}
utils.KlogDump(2, "CR instance updated resTopo:", " ", nrtUpdated)
return nil
}

41
pkg/podres/client.go Normal file
View file

@ -0,0 +1,41 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podres
import (
"fmt"
"log"
"time"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
)
const (
// obtained the following values from node e2e tests : https://github.com/kubernetes/kubernetes/blob/82baa26905c94398a0d19e1b1ecf54eb8acb6029/test/e2e_node/util.go#L70
defaultPodResourcesTimeout = 10 * time.Second
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
)
func GetPodResClient(socketPath string) (podresourcesapi.PodResourcesListerClient, error) {
podResourceClient, _, err := podresources.GetV1Client(socketPath, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
if err != nil {
return nil, fmt.Errorf("failed to create podresource client: %w", err)
}
log.Printf("Connected to '%q'!", socketPath)
return podResourceClient, nil
}

View file

@ -0,0 +1,78 @@
// Code generated by mockery v2.4.0-beta. DO NOT EDIT.
package podres
import (
context "context"
grpc "google.golang.org/grpc"
mock "github.com/stretchr/testify/mock"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
)
// MockPodResourcesListerClient is an autogenerated mock type for the PodResourcesListerClient type
type MockPodResourcesListerClient struct {
mock.Mock
}
// GetAllocatableResources provides a mock function with given fields: ctx, in, opts
func (_m *MockPodResourcesListerClient) GetAllocatableResources(ctx context.Context, in *v1.AllocatableResourcesRequest, opts ...grpc.CallOption) (*v1.AllocatableResourcesResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *v1.AllocatableResourcesResponse
if rf, ok := ret.Get(0).(func(context.Context, *v1.AllocatableResourcesRequest, ...grpc.CallOption) *v1.AllocatableResourcesResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.AllocatableResourcesResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1.AllocatableResourcesRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// List provides a mock function with given fields: ctx, in, opts
func (_m *MockPodResourcesListerClient) List(ctx context.Context, in *v1.ListPodResourcesRequest, opts ...grpc.CallOption) (*v1.ListPodResourcesResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *v1.ListPodResourcesResponse
if rf, ok := ret.Get(0).(func(context.Context, *v1.ListPodResourcesRequest, ...grpc.CallOption) *v1.ListPodResourcesResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.ListPodResourcesResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1.ListPodResourcesRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View file

@ -0,0 +1,332 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"context"
"fmt"
"time"
"github.com/jaypipes/ghw"
topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"sigs.k8s.io/node-feature-discovery/source"
)
const (
// obtained these values from node e2e tests : https://github.com/kubernetes/kubernetes/blob/82baa26905c94398a0d19e1b1ecf54eb8acb6029/test/e2e_node/util.go#L70
defaultPodResourcesTimeout = 10 * time.Second
)
type nodeResources struct {
perNUMAAllocatable map[int]map[v1.ResourceName]int64
// mapping: resourceName -> resourceID -> nodeID
resourceID2NUMAID map[string]map[string]int
topo *ghw.TopologyInfo
reservedCPUIDPerNUMA map[int][]string
}
type resourceData struct {
available int64
allocatable int64
capacity int64
}
func NewResourcesAggregator(podResourceClient podresourcesapi.PodResourcesListerClient) (ResourcesAggregator, error) {
var err error
topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{
"/sys": string(source.SysfsDir),
}))
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout)
defer cancel()
// Pod Resource API client
resp, err := podResourceClient.GetAllocatableResources(ctx, &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("can't receive response: %v.Get(_) = _, %w", podResourceClient, err)
}
return NewResourcesAggregatorFromData(topo, resp), nil
}
// NewResourcesAggregatorFromData is used to aggregate resource information based on the received data from underlying hardware and podresource API
func NewResourcesAggregatorFromData(topo *ghw.TopologyInfo, resp *podresourcesapi.AllocatableResourcesResponse) ResourcesAggregator {
allDevs := getContainerDevicesFromAllocatableResources(resp, topo)
return &nodeResources{
topo: topo,
resourceID2NUMAID: makeResourceMap(len(topo.Nodes), allDevs),
perNUMAAllocatable: makeNodeAllocatable(allDevs),
reservedCPUIDPerNUMA: makeReservedCPUMap(topo.Nodes, allDevs),
}
}
// Aggregate provides the mapping (numa zone name) -> Zone from the given PodResources.
func (noderesourceData *nodeResources) Aggregate(podResData []PodResources) topologyv1alpha1.ZoneList {
perNuma := make(map[int]map[v1.ResourceName]*resourceData)
for nodeID := range noderesourceData.topo.Nodes {
nodeRes, ok := noderesourceData.perNUMAAllocatable[nodeID]
if ok {
perNuma[nodeID] = make(map[v1.ResourceName]*resourceData)
for resName, resCap := range nodeRes {
if resName == "cpu" {
perNuma[nodeID][resName] = &resourceData{
allocatable: resCap,
available: resCap,
capacity: resCap + int64(len(noderesourceData.reservedCPUIDPerNUMA[nodeID])),
}
} else {
perNuma[nodeID][resName] = &resourceData{
allocatable: resCap,
available: resCap,
capacity: resCap,
}
}
}
// NUMA node doesn't have any allocatable resources, but yet it exists in the topology
// thus all its CPUs are reserved
} else {
perNuma[nodeID] = make(map[v1.ResourceName]*resourceData)
perNuma[nodeID]["cpu"] = &resourceData{
allocatable: int64(0),
available: int64(0),
capacity: int64(len(noderesourceData.reservedCPUIDPerNUMA[nodeID])),
}
}
}
for _, podRes := range podResData {
for _, contRes := range podRes.Containers {
for _, res := range contRes.Resources {
noderesourceData.updateAvailable(perNuma, res)
}
}
}
zones := make(topologyv1alpha1.ZoneList, 0)
for nodeID, resList := range perNuma {
zone := topologyv1alpha1.Zone{
Name: makeZoneName(nodeID),
Type: "Node",
Resources: make(topologyv1alpha1.ResourceInfoList, 0),
}
costs, err := makeCostsPerNumaNode(noderesourceData.topo.Nodes, nodeID)
if err != nil {
klog.Infof("cannot find costs for NUMA node %d: %v", nodeID, err)
} else {
zone.Costs = topologyv1alpha1.CostList(costs)
}
for name, resData := range resList {
allocatableQty := *resource.NewQuantity(resData.allocatable, resource.DecimalSI)
capacityQty := *resource.NewQuantity(resData.capacity, resource.DecimalSI)
availableQty := *resource.NewQuantity(resData.available, resource.DecimalSI)
zone.Resources = append(zone.Resources, topologyv1alpha1.ResourceInfo{
Name: name.String(),
Available: availableQty,
Allocatable: allocatableQty,
Capacity: capacityQty,
})
}
zones = append(zones, zone)
}
return zones
}
// getContainerDevicesFromAllocatableResources normalize all compute resources to ContainerDevices.
// This is helpful because cpuIDs are not represented as ContainerDevices, but with a different format;
// Having a consistent representation of all the resources as ContainerDevices makes it simpler for
func getContainerDevicesFromAllocatableResources(availRes *podresourcesapi.AllocatableResourcesResponse, topo *ghw.TopologyInfo) []*podresourcesapi.ContainerDevices {
var contDevs []*podresourcesapi.ContainerDevices
contDevs = append(contDevs, availRes.GetDevices()...)
cpuIDToNodeIDMap := MakeLogicalCoreIDToNodeIDMap(topo)
cpusPerNuma := make(map[int][]string)
for _, cpuID := range availRes.GetCpuIds() {
nodeID, ok := cpuIDToNodeIDMap[int(cpuID)]
if !ok {
klog.Infof("cannot find the NUMA node for CPU %d", cpuID)
continue
}
cpuIDList := cpusPerNuma[nodeID]
cpuIDList = append(cpuIDList, fmt.Sprintf("%d", cpuID))
cpusPerNuma[nodeID] = cpuIDList
}
for nodeID, cpuList := range cpusPerNuma {
contDevs = append(contDevs, &podresourcesapi.ContainerDevices{
ResourceName: string(v1.ResourceCPU),
DeviceIds: cpuList,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{ID: int64(nodeID)},
},
},
})
}
return contDevs
}
// updateAvailable computes the actually available resources.
// This function assumes the available resources are initialized to be equal to the allocatable.
func (noderesourceData *nodeResources) updateAvailable(numaData map[int]map[v1.ResourceName]*resourceData, ri ResourceInfo) {
for _, resID := range ri.Data {
resName := string(ri.Name)
resMap, ok := noderesourceData.resourceID2NUMAID[resName]
if !ok {
klog.Infof("unknown resource %q", ri.Name)
continue
}
nodeID, ok := resMap[resID]
if !ok {
klog.Infof("unknown resource %q: %q", resName, resID)
continue
}
numaData[nodeID][ri.Name].available--
}
}
// makeZoneName returns the canonical name of a NUMA zone from its ID.
func makeZoneName(nodeID int) string {
return fmt.Sprintf("node-%d", nodeID)
}
// makeNodeAllocatable computes the node allocatable as mapping (NUMA node ID) -> Resource -> Allocatable (amount, int).
// The computation is done assuming all the resources to represent the allocatable for are represented on a slice
// of ContainerDevices. No special treatment is done for CPU IDs. See getContainerDevicesFromAllocatableResources.
func makeNodeAllocatable(devices []*podresourcesapi.ContainerDevices) map[int]map[v1.ResourceName]int64 {
perNUMAAllocatable := make(map[int]map[v1.ResourceName]int64)
// initialize with the capacities
for _, device := range devices {
resourceName := device.GetResourceName()
for _, node := range device.GetTopology().GetNodes() {
nodeID := int(node.GetID())
nodeRes, ok := perNUMAAllocatable[nodeID]
if !ok {
nodeRes = make(map[v1.ResourceName]int64)
}
nodeRes[v1.ResourceName(resourceName)] += int64(len(device.GetDeviceIds()))
perNUMAAllocatable[nodeID] = nodeRes
}
}
return perNUMAAllocatable
}
func MakeLogicalCoreIDToNodeIDMap(topo *ghw.TopologyInfo) map[int]int {
core2node := make(map[int]int)
for _, node := range topo.Nodes {
for _, core := range node.Cores {
for _, procID := range core.LogicalProcessors {
core2node[procID] = node.ID
}
}
}
return core2node
}
// makeResourceMap creates the mapping (resource name) -> (device ID) -> (NUMA node ID) from the given slice of ContainerDevices.
// this is useful to quickly learn the NUMA ID of a given (resource, device).
func makeResourceMap(numaNodes int, devices []*podresourcesapi.ContainerDevices) map[string]map[string]int {
resourceMap := make(map[string]map[string]int)
for _, device := range devices {
resourceName := device.GetResourceName()
_, ok := resourceMap[resourceName]
if !ok {
resourceMap[resourceName] = make(map[string]int)
}
for _, node := range device.GetTopology().GetNodes() {
nodeID := int(node.GetID())
for _, deviceID := range device.GetDeviceIds() {
resourceMap[resourceName][deviceID] = nodeID
}
}
}
return resourceMap
}
// makeCostsPerNumaNode builds the cost map to reach all the known NUMA zones (mapping (numa zone) -> cost) starting from the given NUMA zone.
func makeCostsPerNumaNode(nodes []*ghw.TopologyNode, nodeIDSrc int) ([]topologyv1alpha1.CostInfo, error) {
nodeSrc := findNodeByID(nodes, nodeIDSrc)
if nodeSrc == nil {
return nil, fmt.Errorf("unknown node: %d", nodeIDSrc)
}
nodeCosts := make([]topologyv1alpha1.CostInfo, 0)
for nodeIDDst, dist := range nodeSrc.Distances {
// TODO: this assumes there are no holes (= no offline node) in the distance vector
nodeCosts = append(nodeCosts, topologyv1alpha1.CostInfo{
Name: makeZoneName(nodeIDDst),
Value: int64(dist),
})
}
return nodeCosts, nil
}
func findNodeByID(nodes []*ghw.TopologyNode, nodeID int) *ghw.TopologyNode {
for _, node := range nodes {
if node.ID == nodeID {
return node
}
}
return nil
}
func makeReservedCPUMap(nodes []*ghw.TopologyNode, devices []*podresourcesapi.ContainerDevices) map[int][]string {
reservedCPUsPerNuma := make(map[int][]string)
cpus := getCPUs(devices)
for _, node := range nodes {
nodeID := node.ID
for _, core := range node.Cores {
for _, cpu := range core.LogicalProcessors {
cpuID := fmt.Sprintf("%d", cpu)
_, ok := cpus[cpuID]
if !ok {
cpuIDList, ok := reservedCPUsPerNuma[nodeID]
if !ok {
cpuIDList = make([]string, 0)
}
cpuIDList = append(cpuIDList, cpuID)
reservedCPUsPerNuma[nodeID] = cpuIDList
}
}
}
}
return reservedCPUsPerNuma
}
func getCPUs(devices []*podresourcesapi.ContainerDevices) map[string]int {
cpuMap := make(map[string]int)
for _, device := range devices {
if device.GetResourceName() == "cpu" {
for _, devId := range device.DeviceIds {
cpuMap[devId] = int(device.Topology.Nodes[0].ID)
}
}
}
return cpuMap
}

View file

@ -0,0 +1,511 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"encoding/json"
"log"
"sort"
"testing"
"github.com/jaypipes/ghw"
cmp "github.com/google/go-cmp/cmp"
. "github.com/smartystreets/goconvey/convey"
"k8s.io/apimachinery/pkg/api/resource"
topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
)
func TestResourcesAggregator(t *testing.T) {
fakeTopo := ghw.TopologyInfo{}
Convey("When recovering test topology from JSON data", t, func() {
err := json.Unmarshal([]byte(testTopology), &fakeTopo)
So(err, ShouldBeNil)
})
var resAggr ResourcesAggregator
Convey("When I aggregate the node resources fake data and no pod allocation", t, func() {
availRes := &v1.AllocatableResourcesResponse{
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-0"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-1"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-2"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA-3"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netBBB-0"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netBBB-1"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/gpu",
DeviceIds: []string{"gpuAAA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
},
// CPUId 0 and 1 are missing from the list below to simulate
// that they are not allocatable CPUs (kube-reserved or system-reserved)
CpuIds: []int64{
2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
},
}
resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes)
Convey("When aggregating resources", func() {
expected := topologyv1alpha1.ZoneList{
topologyv1alpha1.Zone{
Name: "node-0",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 10,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 20,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Available: resource.MustParse("11"),
Allocatable: resource.MustParse("11"),
Capacity: resource.MustParse("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Available: resource.MustParse("4"),
Allocatable: resource.MustParse("4"),
Capacity: resource.MustParse("4"),
},
},
},
topologyv1alpha1.Zone{
Name: "node-1",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 20,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 10,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Available: resource.MustParse("11"),
Allocatable: resource.MustParse("11"),
Capacity: resource.MustParse("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/gpu",
Available: resource.MustParse("1"),
Allocatable: resource.MustParse("1"),
Capacity: resource.MustParse("1"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Available: resource.MustParse("4"),
Allocatable: resource.MustParse("4"),
Capacity: resource.MustParse("4"),
},
},
},
}
res := resAggr.Aggregate(nil) // no pods allocation
sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
log.Printf("result=%v", res)
log.Printf("expected=%v", expected)
log.Printf("diff=%s", cmp.Diff(res, expected))
So(cmp.Equal(res, expected), ShouldBeFalse)
})
})
Convey("When I aggregate the node resources fake data and some pod allocation", t, func() {
availRes := &v1.AllocatableResourcesResponse{
Devices: []*v1.ContainerDevices{
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netAAA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 0,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/net",
DeviceIds: []string{"netBBB"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
&v1.ContainerDevices{
ResourceName: "fake.io/gpu",
DeviceIds: []string{"gpuAAA"},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
&v1.NUMANode{
ID: 1,
},
},
},
},
},
// CPUId 0 is missing from the list below to simulate
// that it not allocatable (kube-reserved or system-reserved)
CpuIds: []int64{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
},
}
resAggr = NewResourcesAggregatorFromData(&fakeTopo, availRes)
Convey("When aggregating resources", func() {
podRes := []PodResources{
PodResources{
Name: "test-pod-0",
Namespace: "default",
Containers: []ContainerResources{
ContainerResources{
Name: "test-cnt-0",
Resources: []ResourceInfo{
ResourceInfo{
Name: "cpu",
Data: []string{"5", "7"},
},
ResourceInfo{
Name: "fake.io/net",
Data: []string{"netBBB"},
},
},
},
},
},
}
expected := topologyv1alpha1.ZoneList{
topologyv1alpha1.Zone{
Name: "node-0",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 10,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 20,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Available: resource.MustParse("11"),
Allocatable: resource.MustParse("11"),
Capacity: resource.MustParse("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Available: resource.MustParse("1"),
Allocatable: resource.MustParse("1"),
Capacity: resource.MustParse("1"),
},
},
},
topologyv1alpha1.Zone{
Name: "node-1",
Type: "Node",
Costs: topologyv1alpha1.CostList{
topologyv1alpha1.CostInfo{
Name: "node-0",
Value: 20,
},
topologyv1alpha1.CostInfo{
Name: "node-1",
Value: 10,
},
},
Resources: topologyv1alpha1.ResourceInfoList{
topologyv1alpha1.ResourceInfo{
Name: "cpu",
Available: resource.MustParse("10"),
Allocatable: resource.MustParse("12"),
Capacity: resource.MustParse("12"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/gpu",
Available: resource.MustParse("1"),
Allocatable: resource.MustParse("1"),
Capacity: resource.MustParse("1"),
},
topologyv1alpha1.ResourceInfo{
Name: "fake.io/net",
Available: resource.MustParse("0"),
Allocatable: resource.MustParse("1"),
Capacity: resource.MustParse("1"),
},
},
},
}
res := resAggr.Aggregate(podRes)
sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
log.Printf("result=%v", res)
log.Printf("expected=%v", expected)
log.Printf("diff=%s", cmp.Diff(res, expected))
So(cmp.Equal(res, expected), ShouldBeTrue)
})
})
}
// ghwc topology -f json
var testTopology string = `{
"nodes": [
{
"id": 0,
"cores": [
{
"id": 0,
"index": 0,
"total_threads": 2,
"logical_processors": [
0,
12
]
},
{
"id": 10,
"index": 1,
"total_threads": 2,
"logical_processors": [
10,
22
]
},
{
"id": 1,
"index": 2,
"total_threads": 2,
"logical_processors": [
14,
2
]
},
{
"id": 2,
"index": 3,
"total_threads": 2,
"logical_processors": [
16,
4
]
},
{
"id": 8,
"index": 4,
"total_threads": 2,
"logical_processors": [
18,
6
]
},
{
"id": 9,
"index": 5,
"total_threads": 2,
"logical_processors": [
20,
8
]
}
],
"distances": [
10,
20
]
},
{
"id": 1,
"cores": [
{
"id": 0,
"index": 0,
"total_threads": 2,
"logical_processors": [
1,
13
]
},
{
"id": 10,
"index": 1,
"total_threads": 2,
"logical_processors": [
11,
23
]
},
{
"id": 1,
"index": 2,
"total_threads": 2,
"logical_processors": [
15,
3
]
},
{
"id": 2,
"index": 3,
"total_threads": 2,
"logical_processors": [
17,
5
]
},
{
"id": 8,
"index": 4,
"total_threads": 2,
"logical_processors": [
19,
7
]
},
{
"id": 9,
"index": 5,
"total_threads": 2,
"logical_processors": [
21,
9
]
}
],
"distances": [
20,
10
]
}
]
}`

View file

@ -0,0 +1,196 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"context"
"fmt"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
)
type PodResourcesScanner struct {
namespace string
podResourceClient podresourcesapi.PodResourcesListerClient
apihelper apihelper.APIHelpers
}
func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.PodResourcesListerClient, kubeApihelper apihelper.APIHelpers) (ResourcesScanner, error) {
resourcemonitorInstance := &PodResourcesScanner{
namespace: namespace,
podResourceClient: podResourceClient,
apihelper: kubeApihelper,
}
if resourcemonitorInstance.namespace != "*" {
klog.Infof("watching namespace %q", resourcemonitorInstance.namespace)
} else {
klog.Infof("watching all namespaces")
}
return resourcemonitorInstance, nil
}
// isWatchable tells if the the given namespace should be watched.
func (resMon *PodResourcesScanner) isWatchable(podNamespace string, podName string, hasDevice bool) (bool, bool, error) {
cli, err := resMon.apihelper.GetClient()
if err != nil {
return false, false, err
}
pod, err := resMon.apihelper.GetPod(cli, podNamespace, podName)
if err != nil {
return false, false, err
}
klog.Infof("podresource: %s", podName)
isIntegralGuaranteed := hasExclusiveCPUs(pod)
if resMon.namespace == "*" && (isIntegralGuaranteed || hasDevice) {
return true, isIntegralGuaranteed, nil
}
// TODO: add an explicit check for guaranteed pods and pods with devices
return resMon.namespace == podNamespace && (isIntegralGuaranteed || hasDevice), isIntegralGuaranteed, nil
}
// hasExclusiveCPUs returns true if a guranteed pod is allocated exclusive CPUs else returns false.
// In isWatchable() function we check for the pod QoS and proceed if it is guaranteed (i.e. request == limit)
// and hence we only check for request in the function below.
func hasExclusiveCPUs(pod *v1.Pod) bool {
var totalCPU int64
var cpuQuantity resource.Quantity
for _, container := range pod.Spec.InitContainers {
var ok bool
if cpuQuantity, ok = container.Resources.Requests[v1.ResourceCPU]; !ok {
continue
}
totalCPU += cpuQuantity.Value()
isInitContainerGuaranteed := hasIntegralCPUs(pod, &container)
if !isInitContainerGuaranteed {
return false
}
}
for _, container := range pod.Spec.Containers {
var ok bool
if cpuQuantity, ok = container.Resources.Requests[v1.ResourceCPU]; !ok {
continue
}
totalCPU += cpuQuantity.Value()
isAppContainerGuaranteed := hasIntegralCPUs(pod, &container)
if !isAppContainerGuaranteed {
return false
}
}
//No CPUs requested in all the containers in the pod
return totalCPU != 0
}
// hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false
func hasIntegralCPUs(pod *v1.Pod, container *v1.Container) bool {
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
return cpuQuantity.Value()*1000 == cpuQuantity.MilliValue()
}
// Scan gathers all the PodResources from the system, using the podresources API client.
func (resMon *PodResourcesScanner) Scan() ([]PodResources, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout)
defer cancel()
// Pod Resource API client
resp, err := resMon.podResourceClient.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("can't receive response: %v.Get(_) = _, %w", resMon.podResourceClient, err)
}
var podResData []PodResources
for _, podResource := range resp.GetPodResources() {
klog.Infof("podresource iter: %s", podResource.GetName())
hasDevice := hasDevice(podResource)
isWatchable, isIntegralGuaranteed, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice)
if err != nil {
return nil, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %v", podResource.GetNamespace(), podResource.GetName(), err)
}
if !isWatchable {
continue
}
podRes := PodResources{
Name: podResource.GetName(),
Namespace: podResource.GetNamespace(),
}
for _, container := range podResource.GetContainers() {
contRes := ContainerResources{
Name: container.Name,
}
if isIntegralGuaranteed {
cpuIDs := container.GetCpuIds()
if len(cpuIDs) > 0 {
var resCPUs []string
for _, cpuID := range container.GetCpuIds() {
resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10))
}
contRes.Resources = []ResourceInfo{
{
Name: v1.ResourceCPU,
Data: resCPUs,
},
}
}
}
for _, device := range container.GetDevices() {
contRes.Resources = append(contRes.Resources, ResourceInfo{
Name: v1.ResourceName(device.ResourceName),
Data: device.DeviceIds,
})
}
if len(contRes.Resources) == 0 {
continue
}
podRes.Containers = append(podRes.Containers, contRes)
}
if len(podRes.Containers) == 0 {
continue
}
podResData = append(podResData, podRes)
}
return podResData, nil
}
func hasDevice(podResource *podresourcesapi.PodResources) bool {
for _, container := range podResource.GetContainers() {
if len(container.GetDevices()) > 0 {
return true
}
}
klog.Infof("pod:%s doesn't have devices", podResource.GetName())
return false
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,62 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcemonitor
import (
"time"
corev1 "k8s.io/api/core/v1"
topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
)
// Args stores commandline arguments used for resource monitoring
type Args struct {
PodResourceSocketPath string
SleepInterval time.Duration
Namespace string
KubeletConfigFile string
}
// ResourceInfo stores information of resources and their corresponding IDs obtained from PodResource API
type ResourceInfo struct {
Name corev1.ResourceName
Data []string
}
// ContainerResources contains information about the node resources assigned to a container
type ContainerResources struct {
Name string
Resources []ResourceInfo
}
// PodResources contains information about the node resources assigned to a pod
type PodResources struct {
Name string
Namespace string
Containers []ContainerResources
}
// ResourcesScanner gathers all the PodResources from the system, using the podresources API client
type ResourcesScanner interface {
Scan() ([]PodResources, error)
}
// ResourceAggregator aggregates resource information based on the received data from underlying hardware and podresource API
type ResourcesAggregator interface {
Aggregate(podResData []PodResources) topologyv1alpha1.ZoneList
}

View file

@ -0,0 +1,46 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologypolicy
import (
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/apis/config"
)
// DetectTopologyPolicy returns string type which present
// both Topology manager policy and scope
func DetectTopologyPolicy(policy string, scope string) v1alpha1.TopologyManagerPolicy {
switch policy {
case config.SingleNumaNodeTopologyManagerPolicy:
if scope == config.PodTopologyManagerScope {
return v1alpha1.SingleNUMANodePodLevel
} else if scope == config.ContainerTopologyManagerScope {
return v1alpha1.SingleNUMANodeContainerLevel
} else {
// default scope for single-numa-node
return v1alpha1.SingleNUMANodeContainerLevel
}
case config.RestrictedTopologyManagerPolicy:
return v1alpha1.Restricted
case config.BestEffortTopologyManagerPolicy:
return v1alpha1.BestEffort
case config.NoneTopologyManagerPolicy:
return v1alpha1.None
default:
return v1alpha1.None
}
}

View file

@ -0,0 +1,48 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Re-generate by running 'make mock'
package topologyupdater
import (
context "context"
grpc "google.golang.org/grpc"
mock "github.com/stretchr/testify/mock"
)
// MockNodeTopologyClient is an autogenerated mock type for the NodeTopologyClient type
type MockNodeTopologyClient struct {
mock.Mock
}
// UpdateNodeTopology provides a mock function with given fields: ctx, in, opts
func (_m *MockNodeTopologyClient) UpdateNodeTopology(ctx context.Context, in *NodeTopologyRequest, opts ...grpc.CallOption) (*NodeTopologyResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *NodeTopologyResponse
if rf, ok := ret.Get(0).(func(context.Context, *NodeTopologyRequest, ...grpc.CallOption) *NodeTopologyResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*NodeTopologyResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *NodeTopologyRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View file

@ -0,0 +1,354 @@
//
//Copyright 2021 The Kubernetes Authors.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.23.0
// protoc v3.17.3
// source: pkg/topologyupdater/topology-updater.proto
package topologyupdater
import (
context "context"
proto "github.com/golang/protobuf/proto"
v1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type NodeTopologyRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
NfdVersion string `protobuf:"bytes,1,opt,name=nfd_version,json=nfdVersion,proto3" json:"nfd_version,omitempty"`
NodeName string `protobuf:"bytes,2,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"`
TopologyPolicies []string `protobuf:"bytes,3,rep,name=topology_policies,json=topologyPolicies,proto3" json:"topology_policies,omitempty"`
Zones []*v1alpha1.Zone `protobuf:"bytes,4,rep,name=zones,proto3" json:"zones,omitempty"`
}
func (x *NodeTopologyRequest) Reset() {
*x = NodeTopologyRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_topologyupdater_topology_updater_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *NodeTopologyRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*NodeTopologyRequest) ProtoMessage() {}
func (x *NodeTopologyRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_topologyupdater_topology_updater_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use NodeTopologyRequest.ProtoReflect.Descriptor instead.
func (*NodeTopologyRequest) Descriptor() ([]byte, []int) {
return file_pkg_topologyupdater_topology_updater_proto_rawDescGZIP(), []int{0}
}
func (x *NodeTopologyRequest) GetNfdVersion() string {
if x != nil {
return x.NfdVersion
}
return ""
}
func (x *NodeTopologyRequest) GetNodeName() string {
if x != nil {
return x.NodeName
}
return ""
}
func (x *NodeTopologyRequest) GetTopologyPolicies() []string {
if x != nil {
return x.TopologyPolicies
}
return nil
}
func (x *NodeTopologyRequest) GetZones() []*v1alpha1.Zone {
if x != nil {
return x.Zones
}
return nil
}
type NodeTopologyResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *NodeTopologyResponse) Reset() {
*x = NodeTopologyResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_topologyupdater_topology_updater_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *NodeTopologyResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*NodeTopologyResponse) ProtoMessage() {}
func (x *NodeTopologyResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_topologyupdater_topology_updater_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use NodeTopologyResponse.ProtoReflect.Descriptor instead.
func (*NodeTopologyResponse) Descriptor() ([]byte, []int) {
return file_pkg_topologyupdater_topology_updater_proto_rawDescGZIP(), []int{1}
}
var File_pkg_topologyupdater_topology_updater_proto protoreflect.FileDescriptor
var file_pkg_topologyupdater_topology_updater_proto_rawDesc = []byte{
0x0a, 0x2a, 0x70, 0x6b, 0x67, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x75, 0x70,
0x64, 0x61, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x2d, 0x75,
0x70, 0x64, 0x61, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x74, 0x6f,
0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x72, 0x1a, 0x66, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x38, 0x73, 0x74, 0x6f, 0x70,
0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x61, 0x77, 0x61, 0x72, 0x65, 0x73, 0x63, 0x68, 0x65, 0x64, 0x77,
0x67, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x6f,
0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x2d, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61,
0x70, 0x69, 0x73, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x2f, 0x76, 0x31, 0x61,
0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa6, 0x01, 0x0a, 0x13, 0x4e, 0x6f, 0x64, 0x65, 0x54, 0x6f,
0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a,
0x0b, 0x6e, 0x66, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x6e, 0x66, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b,
0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x74,
0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x5f, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x69, 0x65, 0x73,
0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79,
0x50, 0x6f, 0x6c, 0x69, 0x63, 0x69, 0x65, 0x73, 0x12, 0x24, 0x0a, 0x05, 0x7a, 0x6f, 0x6e, 0x65,
0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68,
0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x52, 0x05, 0x7a, 0x6f, 0x6e, 0x65, 0x73, 0x22, 0x16,
0x0a, 0x14, 0x4e, 0x6f, 0x64, 0x65, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x71, 0x0a, 0x0c, 0x4e, 0x6f, 0x64, 0x65, 0x54, 0x6f,
0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x61, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65,
0x4e, 0x6f, 0x64, 0x65, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x24, 0x2e, 0x74,
0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x72, 0x2e, 0x4e,
0x6f, 0x64, 0x65, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x25, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x75, 0x70, 0x64,
0x61, 0x74, 0x65, 0x72, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67,
0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x38, 0x5a, 0x36, 0x73, 0x69, 0x67,
0x73, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2d, 0x66, 0x65,
0x61, 0x74, 0x75, 0x72, 0x65, 0x2d, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f,
0x70, 0x6b, 0x67, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x75, 0x70, 0x64, 0x61,
0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pkg_topologyupdater_topology_updater_proto_rawDescOnce sync.Once
file_pkg_topologyupdater_topology_updater_proto_rawDescData = file_pkg_topologyupdater_topology_updater_proto_rawDesc
)
func file_pkg_topologyupdater_topology_updater_proto_rawDescGZIP() []byte {
file_pkg_topologyupdater_topology_updater_proto_rawDescOnce.Do(func() {
file_pkg_topologyupdater_topology_updater_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_topologyupdater_topology_updater_proto_rawDescData)
})
return file_pkg_topologyupdater_topology_updater_proto_rawDescData
}
var file_pkg_topologyupdater_topology_updater_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pkg_topologyupdater_topology_updater_proto_goTypes = []interface{}{
(*NodeTopologyRequest)(nil), // 0: topologyupdater.NodeTopologyRequest
(*NodeTopologyResponse)(nil), // 1: topologyupdater.NodeTopologyResponse
(*v1alpha1.Zone)(nil), // 2: v1alpha1.Zone
}
var file_pkg_topologyupdater_topology_updater_proto_depIdxs = []int32{
2, // 0: topologyupdater.NodeTopologyRequest.zones:type_name -> v1alpha1.Zone
0, // 1: topologyupdater.NodeTopology.UpdateNodeTopology:input_type -> topologyupdater.NodeTopologyRequest
1, // 2: topologyupdater.NodeTopology.UpdateNodeTopology:output_type -> topologyupdater.NodeTopologyResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_pkg_topologyupdater_topology_updater_proto_init() }
func file_pkg_topologyupdater_topology_updater_proto_init() {
if File_pkg_topologyupdater_topology_updater_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkg_topologyupdater_topology_updater_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*NodeTopologyRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkg_topologyupdater_topology_updater_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*NodeTopologyResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_topologyupdater_topology_updater_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkg_topologyupdater_topology_updater_proto_goTypes,
DependencyIndexes: file_pkg_topologyupdater_topology_updater_proto_depIdxs,
MessageInfos: file_pkg_topologyupdater_topology_updater_proto_msgTypes,
}.Build()
File_pkg_topologyupdater_topology_updater_proto = out.File
file_pkg_topologyupdater_topology_updater_proto_rawDesc = nil
file_pkg_topologyupdater_topology_updater_proto_goTypes = nil
file_pkg_topologyupdater_topology_updater_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// NodeTopologyClient is the client API for NodeTopology service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type NodeTopologyClient interface {
UpdateNodeTopology(ctx context.Context, in *NodeTopologyRequest, opts ...grpc.CallOption) (*NodeTopologyResponse, error)
}
type nodeTopologyClient struct {
cc grpc.ClientConnInterface
}
func NewNodeTopologyClient(cc grpc.ClientConnInterface) NodeTopologyClient {
return &nodeTopologyClient{cc}
}
func (c *nodeTopologyClient) UpdateNodeTopology(ctx context.Context, in *NodeTopologyRequest, opts ...grpc.CallOption) (*NodeTopologyResponse, error) {
out := new(NodeTopologyResponse)
err := c.cc.Invoke(ctx, "/topologyupdater.NodeTopology/UpdateNodeTopology", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// NodeTopologyServer is the server API for NodeTopology service.
type NodeTopologyServer interface {
UpdateNodeTopology(context.Context, *NodeTopologyRequest) (*NodeTopologyResponse, error)
}
// UnimplementedNodeTopologyServer can be embedded to have forward compatible implementations.
type UnimplementedNodeTopologyServer struct {
}
func (*UnimplementedNodeTopologyServer) UpdateNodeTopology(context.Context, *NodeTopologyRequest) (*NodeTopologyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateNodeTopology not implemented")
}
func RegisterNodeTopologyServer(s *grpc.Server, srv NodeTopologyServer) {
s.RegisterService(&_NodeTopology_serviceDesc, srv)
}
func _NodeTopology_UpdateNodeTopology_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(NodeTopologyRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NodeTopologyServer).UpdateNodeTopology(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/topologyupdater.NodeTopology/UpdateNodeTopology",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NodeTopologyServer).UpdateNodeTopology(ctx, req.(*NodeTopologyRequest))
}
return interceptor(ctx, in, info, handler)
}
var _NodeTopology_serviceDesc = grpc.ServiceDesc{
ServiceName: "topologyupdater.NodeTopology",
HandlerType: (*NodeTopologyServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "UpdateNodeTopology",
Handler: _NodeTopology_UpdateNodeTopology_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pkg/topologyupdater/topology-updater.proto",
}

View file

@ -0,0 +1,35 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
option go_package = "sigs.k8s.io/node-feature-discovery/pkg/topologyupdater";
import "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1/generated.proto";
package topologyupdater;
service NodeTopology{
rpc UpdateNodeTopology(NodeTopologyRequest) returns (NodeTopologyResponse);
}
message NodeTopologyRequest {
string nfd_version = 1;
string node_name = 2;
repeated string topology_policies = 3;
repeated v1alpha1.Zone zones = 4;
}
message NodeTopologyResponse {
}

View file

@ -30,6 +30,8 @@ var (
SysfsDir = HostDir(pathPrefix + "sys")
// UsrPath is where the /usr directory of the system to be inspected is located
UsrDir = HostDir(pathPrefix + "usr")
// VarPath is where the /var directory of the system to be inspected is located
VarDir = HostDir(pathPrefix + "var")
)
// HostDir is a helper for handling host system directories

View file

@ -0,0 +1,42 @@
apiVersion: kubelet.config.k8s.io/v1beta1
authentication:
anonymous:
enabled: false
webhook:
cacheTTL: 0s
enabled: true
x509:
clientCAFile: /etc/kubernetes/pki/ca.crt
authorization:
mode: Webhook
webhook:
cacheAuthorizedTTL: 0s
cacheUnauthorizedTTL: 0s
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
cpuManagerPolicy: static
cpuManagerReconcilePeriod: 0s
evictionHard:
imagefs.available: 0%
nodefs.available: 0%
nodefs.inodesFree: 0%
evictionPressureTransitionPeriod: 0s
fileCheckFrequency: 0s
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 0s
imageGCHighThresholdPercent: 100
imageMinimumGCAge: 0s
kind: KubeletConfiguration
logging: {}
nodeStatusReportFrequency: 0s
nodeStatusUpdateFrequency: 0s
reservedSystemCPUs: 1,3
rotateCertificates: true
runtimeRequestTimeout: 0s
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 0s
syncFrequency: 0s
topologyManagerPolicy: single-numa-node
volumeStatsAggPeriod: 0s

View file

@ -0,0 +1,12 @@
-----BEGIN CERTIFICATE-----
MIIBwzCCAW2gAwIBAgIJAMRplUIVEGN7MA0GCSqGSIb3DQEBCwUAMFsxCzAJBgNV
BAYTAkZJMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxFDASBgNVBAMMC25mZC10ZXN0LWNhMB4XDTIwMTExODEz
NTIxOFoXDTMwMTExNjEzNTIxOFowDTELMAkGA1UEBhMCRkkwgZ8wDQYJKoZIhvcN
AQEBBQADgY0AMIGJAoGBAOX+AzRK17SFuhgSeWrf+B8SZUEdBhZGjBK6ypmAZgMW
6JWwTbsdJRU6uorTX2XvgOtRolXbwSTwWjFpkvYgK0Eo8kKLU5tOE6XYi04UVYRv
Ha26pQKpfseUug641GOtUlWYUgGmvSMikEIilh+b/aAG+5/K+fNqi08fLmX6t2XL
AgMBAAGjHjAcMBoGA1UdEQQTMBGCD25mZC10ZXN0LXdvcmtlcjANBgkqhkiG9w0B
AQsFAANBAGuSwyPjduIQz4n7L+tRPs3+XPJ8fvbzC0pADYA0geow/m+X784If8nT
Pj+8quQn9zPsaQ+1bNahobTlHRmQcPE=
-----END CERTIFICATE-----

View file

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAOX+AzRK17SFuhgS
eWrf+B8SZUEdBhZGjBK6ypmAZgMW6JWwTbsdJRU6uorTX2XvgOtRolXbwSTwWjFp
kvYgK0Eo8kKLU5tOE6XYi04UVYRvHa26pQKpfseUug641GOtUlWYUgGmvSMikEIi
lh+b/aAG+5/K+fNqi08fLmX6t2XLAgMBAAECgYEAqsA7gMdP/iaKUvTkUASYIfl2
UzFJI6CcvgsP/4bkNcb8RqXuD81Dis9fT1I+sV9vR0YET9onO1V2oNjQ0wpvETjO
bk5veRfqFLOTavl64pAPGLEvOTWHSHQ9rtFZst1FFfehB1Vw69nBs9E40Zo2Y9yv
gkK+RIKUc2oPqMOigQECQQD8k2jxRX1COF+GO+pBTOTAr3pAmd0ahWAQGoqLwteQ
x+ARRZss1nuX0IGEyJ89hD6dHLv/FhhUxGE1R0xdQ31JAkEA6Rw5VYrAphATPCCX
h2gboAbHTOFAzwjnlW1pU6nlZI89kDAD3TF8d+eq906J8y7ji0YE89/G4HEzHqtQ
vMsucwJBAId2VAlauJRkga8PwVKmd+Vz98BgBTqtH9ljMr1EkbK/0EfTKieBHSZO
GLjrlKQ8ogxHlfh4lDIaZPxbMfSvNqkCQDkEfEmeHK0BtZK5bhbisg8cWVdGqXF6
fhqgnmimX8OO/cHs3KUX25gAhGLlRPzEdUe1orR8AcsYJSbVRHRJRl0CQQC7VBgp
04kaZzLFf61TmqMGVDoG2Wi5HwXYyzAEEEYFW61kwfZ6vuq3AP7NPMfW1F94welg
8LfkI2NBgjyKGiqn
-----END PRIVATE KEY-----