1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-28 10:47:23 +00:00

e2e: add basic topology updater test

Co-authored-by: Swati Sehgal <swsehgal@redhat.com>
Co-authored-by: Francesco Romani <fromani@redhat.com>
Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
This commit is contained in:
Francesco Romani 2022-06-14 10:51:33 +02:00
parent 622adf3863
commit 7d37f72480
8 changed files with 905 additions and 12 deletions

4
go.mod
View file

@ -21,12 +21,14 @@ require (
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
k8s.io/api v0.24.2
k8s.io/apiextensions-apiserver v0.0.0
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
k8s.io/klog/v2 v2.60.1
k8s.io/kubectl v0.24.2
k8s.io/kubelet v0.24.2
k8s.io/kubernetes v1.24.2
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
sigs.k8s.io/yaml v1.2.0
)
@ -180,7 +182,6 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
howett.net/plist v0.0.0-20181124034731-591f970eefbb // indirect
k8s.io/apiextensions-apiserver v0.0.0 // indirect
k8s.io/apiserver v0.24.2 // indirect
k8s.io/cloud-provider v0.24.2 // indirect
k8s.io/component-base v0.24.2 // indirect
@ -193,7 +194,6 @@ require (
k8s.io/legacy-cloud-providers v0.0.0 // indirect
k8s.io/mount-utils v0.24.2 // indirect
k8s.io/pod-security-admission v0.0.0 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect

View file

@ -112,4 +112,7 @@ defaultFeatures:
expectedAnnotationKeys:
- "nfd.node.kubernetes.io/worker.version"
- "nfd.node.kubernetes.io/feature-labels"
kubelet:
configPath: "/var/lib/kubelet/config.yaml"
podResourcesSocketPath: "/var/lib/kubelet/pod-resources/kubelet.sock"

View file

@ -0,0 +1,267 @@
/*
Copyright 2020-2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"context"
"fmt"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
testutils "sigs.k8s.io/node-feature-discovery/test/e2e/utils"
)
var _ = SIGDescribe("Node Feature Discovery topology updater", func() {
var (
extClient *extclient.Clientset
topologyClient *topologyclientset.Clientset
crd *apiextensionsv1.CustomResourceDefinition
topologyUpdaterNode *v1.Node
workerNodes []v1.Node
kubeletConfig *kubeletconfig.KubeletConfiguration
)
f := framework.NewDefaultFramework("node-topology-updater")
BeforeEach(func() {
var err error
if extClient == nil {
extClient, err = extclient.NewForConfig(f.ClientConfig())
Expect(err).NotTo(HaveOccurred())
}
if topologyClient == nil {
topologyClient, err = topologyclientset.NewForConfig(f.ClientConfig())
Expect(err).NotTo(HaveOccurred())
}
cfg, err := testutils.GetConfig()
Expect(err).ToNot(HaveOccurred())
kcfg := cfg.GetKubeletConfig()
By(fmt.Sprintf("Using config (%#v)", kcfg))
By("Creating the node resource topologies CRD")
crd, err = testutils.CreateNodeResourceTopologies(extClient)
Expect(err).NotTo(HaveOccurred())
err = testutils.ConfigureRBAC(f.ClientSet, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
image := fmt.Sprintf("%s:%s", *dockerRepo, *dockerTag)
f.PodClient().CreateSync(testutils.NFDMasterPod(image, false))
// Create nfd-master service
masterService, err := testutils.CreateService(f.ClientSet, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Waiting for the nfd-master service to be up")
Expect(e2enetwork.WaitForService(f.ClientSet, f.Namespace.Name, masterService.Name, true, time.Second, 10*time.Second)).NotTo(HaveOccurred())
By("Creating nfd-topology-updater daemonset")
topologyUpdaterDaemonSet := testutils.NFDTopologyUpdaterDaemonSet(kcfg, fmt.Sprintf("%s:%s", *dockerRepo, *dockerTag), []string{})
topologyUpdaterDaemonSet, err = f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Create(context.TODO(), topologyUpdaterDaemonSet, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
By("Waiting for daemonset pods to be ready")
Expect(testutils.WaitForPodsReady(f.ClientSet, f.Namespace.Name, topologyUpdaterDaemonSet.Spec.Template.Labels["name"], 5)).NotTo(HaveOccurred())
label := labels.SelectorFromSet(map[string]string{"name": topologyUpdaterDaemonSet.Spec.Template.Labels["name"]})
pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: label.String()})
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).ToNot(BeEmpty())
topologyUpdaterNode, err = f.ClientSet.CoreV1().Nodes().Get(context.TODO(), pods.Items[0].Spec.NodeName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
kubeletConfig, err = e2ekubelet.GetCurrentKubeletConfig(topologyUpdaterNode.Name, "", true)
Expect(err).NotTo(HaveOccurred())
workerNodes, err = testutils.GetWorkerNodes(f)
Expect(err).NotTo(HaveOccurred())
})
Context("with single nfd-master pod", func() {
It("should fill the node resource topologies CR with the data", func() {
nodeTopology := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
isValid := testutils.IsValidNodeTopology(nodeTopology, kubeletConfig)
Expect(isValid).To(BeTrue(), "received invalid topology: %v", nodeTopology)
})
It("it should not account for any cpus if a container doesn't request exclusive cpus (best effort QOS)", func() {
By("getting the initial topology information")
initialNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
By("creating a pod consuming resources from the shared, non-exclusive CPU pool (best-effort QoS)")
sleeperPod := testutils.BestEffortSleeperPod()
podMap := make(map[string]*v1.Pod)
pod := f.PodClient().CreateSync(sleeperPod)
podMap[pod.Name] = pod
defer testutils.DeletePodsAsync(f, podMap)
cooldown := 30 * time.Second
By(fmt.Sprintf("getting the updated topology - sleeping for %v", cooldown))
// the object, hance the resource version must NOT change, so we can only sleep
time.Sleep(cooldown)
By("checking the changes in the updated topology - expecting none")
finalNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
initialAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(initialNodeTopo)
finalAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(finalNodeTopo)
if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 {
Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes))
}
zoneName, resName, cmp, ok := testutils.CompareAllocatableResources(initialAllocRes, finalAllocRes)
framework.Logf("zone=%q resource=%q cmp=%v ok=%v", zoneName, resName, cmp, ok)
if !ok {
Fail(fmt.Sprintf("failed to compare allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes))
}
// This is actually a workaround.
// Depending on the (random, by design) order on which ginkgo runs the tests, a test which exclusively allocates CPUs may run before.
// We cannot (nor should) care about what runs before this test, but we know that this may happen.
// The proper solution is to wait for ALL the container requesting exclusive resources to be gone before to end the related test.
// To date, we don't yet have a clean way to wait for these pod (actually containers) to be completely gone
// (hence, releasing the exclusively allocated CPUs) before to end the test, so this test can run with some leftovers hanging around,
// which makes the accounting harder. And this is what we handle here.
isGreaterEqual := (cmp >= 0)
Expect(isGreaterEqual).To(BeTrue(), fmt.Sprintf("final allocatable resources not restored - cmp=%d initial=%v final=%v", cmp, initialAllocRes, finalAllocRes))
})
It("it should not account for any cpus if a container doesn't request exclusive cpus (guaranteed QOS, nonintegral cpu request)", func() {
By("getting the initial topology information")
initialNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
By("creating a pod consuming resources from the shared, non-exclusive CPU pool (guaranteed QoS, nonintegral request)")
sleeperPod := testutils.GuaranteedSleeperPod("500m")
podMap := make(map[string]*v1.Pod)
pod := f.PodClient().CreateSync(sleeperPod)
podMap[pod.Name] = pod
defer testutils.DeletePodsAsync(f, podMap)
cooldown := 30 * time.Second
By(fmt.Sprintf("getting the updated topology - sleeping for %v", cooldown))
// the object, hance the resource version must NOT change, so we can only sleep
time.Sleep(cooldown)
By("checking the changes in the updated topology - expecting none")
finalNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
initialAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(initialNodeTopo)
finalAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(finalNodeTopo)
if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 {
Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes))
}
zoneName, resName, cmp, ok := testutils.CompareAllocatableResources(initialAllocRes, finalAllocRes)
framework.Logf("zone=%q resource=%q cmp=%v ok=%v", zoneName, resName, cmp, ok)
if !ok {
Fail(fmt.Sprintf("failed to compare allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes))
}
// This is actually a workaround.
// Depending on the (random, by design) order on which ginkgo runs the tests, a test which exclusively allocates CPUs may run before.
// We cannot (nor should) care about what runs before this test, but we know that this may happen.
// The proper solution is to wait for ALL the container requesting exclusive resources to be gone before to end the related test.
// To date, we don't yet have a clean way to wait for these pod (actually containers) to be completely gone
// (hence, releasing the exclusively allocated CPUs) before to end the test, so this test can run with some leftovers hanging around,
// which makes the accounting harder. And this is what we handle here.
isGreaterEqual := (cmp >= 0)
Expect(isGreaterEqual).To(BeTrue(), fmt.Sprintf("final allocatable resources not restored - cmp=%d initial=%v final=%v", cmp, initialAllocRes, finalAllocRes))
})
It("it should account for containers requesting exclusive cpus", func() {
nodes, err := testutils.FilterNodesWithEnoughCores(workerNodes, "1000m")
Expect(err).NotTo(HaveOccurred())
if len(nodes) < 1 {
Skip("not enough allocatable cores for this test")
}
By("getting the initial topology information")
initialNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name)
By("creating a pod consuming exclusive CPUs")
sleeperPod := testutils.GuaranteedSleeperPod("1000m")
podMap := make(map[string]*v1.Pod)
pod := f.PodClient().CreateSync(sleeperPod)
podMap[pod.Name] = pod
defer testutils.DeletePodsAsync(f, podMap)
By("getting the updated topology")
var finalNodeTopo *v1alpha1.NodeResourceTopology
Eventually(func() bool {
finalNodeTopo, err = topologyClient.TopologyV1alpha1().NodeResourceTopologies().Get(context.TODO(), topologyUpdaterNode.Name, metav1.GetOptions{})
if err != nil {
framework.Logf("failed to get the node topology resource: %v", err)
return false
}
return finalNodeTopo.ObjectMeta.ResourceVersion != initialNodeTopo.ObjectMeta.ResourceVersion
}, time.Minute, 5*time.Second).Should(BeTrue(), "didn't get updated node topology info")
By("checking the changes in the updated topology")
initialAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(initialNodeTopo)
finalAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(finalNodeTopo)
if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 {
Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes))
}
zoneName, resName, isLess := lessAllocatableResources(initialAllocRes, finalAllocRes)
framework.Logf("zone=%q resource=%q isLess=%v", zoneName, resName, isLess)
Expect(isLess).To(BeTrue(), fmt.Sprintf("final allocatable resources not decreased - initial=%v final=%v", initialAllocRes, finalAllocRes))
})
})
JustAfterEach(func() {
err := testutils.DeconfigureRBAC(f.ClientSet, f.Namespace.Name)
if err != nil {
framework.Logf("failed to delete RBAC resources: %v", err)
}
err = extClient.ApiextensionsV1().CustomResourceDefinitions().Delete(context.TODO(), crd.Name, metav1.DeleteOptions{})
if err != nil {
framework.Logf("failed to delete node resources topologies CRD: %v", err)
}
})
})
// lessAllocatableResources specialize CompareAllocatableResources for this specific e2e use case.
func lessAllocatableResources(expected, got map[string]v1.ResourceList) (string, string, bool) {
zoneName, resName, cmp, ok := testutils.CompareAllocatableResources(expected, got)
if !ok {
framework.Logf("-> cmp failed (not ok)")
return "", "", false
}
if cmp < 0 {
return zoneName, resName, true
}
framework.Logf("-> cmp failed (value=%d)", cmp)
return "", "", false
}

View file

@ -26,18 +26,48 @@ import (
"sigs.k8s.io/yaml"
)
const (
DefaultConfigPath = "/var/lib/kubelet/config.yaml"
DefaultPodResourcesSocketPath = "/var/lib/kubelet/pod-resources/kubelet.sock"
)
var (
e2eConfigFile = flag.String("nfd.e2e-config", "", "Configuration parameters for end-to-end tests")
config *E2EConfig
)
type KubeletConfig struct {
ConfigPath string
PodResourcesSocketPath string
}
type E2EConfig struct {
DefaultFeatures *struct {
LabelWhitelist lookupMap
AnnotationWhitelist lookupMap
Nodes []NodeConfig
}
Kubelet *KubeletConfig
}
// GetKubeletConfig returns a KubeletConfig object with default values, possibly overridden by user settings.
func (conf *E2EConfig) GetKubeletConfig() KubeletConfig {
kcfg := KubeletConfig{
ConfigPath: DefaultConfigPath,
PodResourcesSocketPath: DefaultPodResourcesSocketPath,
}
if conf.Kubelet == nil {
return kcfg
}
if conf.Kubelet.ConfigPath != "" {
kcfg.ConfigPath = conf.Kubelet.ConfigPath
}
if conf.Kubelet.PodResourcesSocketPath != "" {
kcfg.PodResourcesSocketPath = conf.Kubelet.PodResourcesSocketPath
}
return kcfg
}
type NodeConfig struct {

87
test/e2e/utils/node.go Normal file
View file

@ -0,0 +1,87 @@
/*
Copyright 2021-2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/test/e2e/framework"
)
const (
// RoleWorker contains the worker role
RoleWorker = "worker"
)
const (
// LabelRole contains the key for the role label
LabelRole = "node-role.kubernetes.io"
// LabelHostname contains the key for the hostname label
LabelHostname = "kubernetes.io/hostname"
)
// GetWorkerNodes returns all nodes labeled as worker
func GetWorkerNodes(f *framework.Framework) ([]v1.Node, error) {
return GetNodesByRole(f, RoleWorker)
}
// GetByRole returns all nodes with the specified role
func GetNodesByRole(f *framework.Framework, role string) ([]v1.Node, error) {
selector, err := labels.Parse(fmt.Sprintf("%s/%s=", LabelRole, role))
if err != nil {
return nil, err
}
return GetNodesBySelector(f, selector)
}
// GetBySelector returns all nodes with the specified selector
func GetNodesBySelector(f *framework.Framework, selector labels.Selector) ([]v1.Node, error) {
nodes, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, err
}
return nodes.Items, nil
}
// FilterNodesWithEnoughCores returns all nodes with at least the amount of given CPU allocatable
func FilterNodesWithEnoughCores(nodes []v1.Node, cpuAmount string) ([]v1.Node, error) {
requestCpu := resource.MustParse(cpuAmount)
framework.Logf("checking request %v on %d nodes", requestCpu, len(nodes))
resNodes := []v1.Node{}
for _, node := range nodes {
availCpu, ok := node.Status.Allocatable[v1.ResourceCPU]
if !ok || availCpu.IsZero() {
return nil, fmt.Errorf("node %q has no allocatable CPU", node.Name)
}
if availCpu.Cmp(requestCpu) < 1 {
framework.Logf("node %q available cpu %v requested cpu %v", node.Name, availCpu, requestCpu)
continue
}
framework.Logf("node %q has enough resources, cluster OK", node.Name)
resNodes = append(resNodes, node)
}
return resNodes, nil
}

View file

@ -0,0 +1,249 @@
/*
Copyright 2020-2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
"github.com/onsi/gomega"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
)
func init() {
// make golangci-lint happy
_ = apiextensionsv1.AddToScheme(scheme.Scheme)
}
// NewNodeResourceTopologies makes a CRD golang object representing NodeResourceTopology definition
func NewNodeResourceTopologies() (*apiextensionsv1.CustomResourceDefinition, error) {
_, file, _, ok := runtime.Caller(0)
if !ok {
return nil, fmt.Errorf("cannot retrieve manifests directory")
}
baseDir := filepath.Dir(file)
crdPath := filepath.Clean(filepath.Join(baseDir, "..", "..", "..", "deployment", "base", "noderesourcetopologies-crd", "noderesourcetopologies.yaml"))
data, err := os.ReadFile(crdPath)
if err != nil {
return nil, err
}
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(data, nil, nil)
if err != nil {
return nil, err
}
crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
return nil, fmt.Errorf("unexpected type, got %t", obj)
}
return crd, nil
}
// CreateNodeResourceTopologies creates the NodeResourceTopology in the cluster if the CRD doesn't exists already.
// Returns the CRD golang object present in the cluster.
func CreateNodeResourceTopologies(extClient extclient.Interface) (*apiextensionsv1.CustomResourceDefinition, error) {
crd, err := NewNodeResourceTopologies()
if err != nil {
return nil, err
}
updatedCrd, err := extClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
if err == nil {
return updatedCrd, nil
}
return extClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{})
}
// GetNodeTopology returns the NodeResourceTopology data for the node identified by `nodeName`.
func GetNodeTopology(topologyClient *topologyclientset.Clientset, nodeName string) *v1alpha1.NodeResourceTopology {
var nodeTopology *v1alpha1.NodeResourceTopology
var err error
gomega.EventuallyWithOffset(1, func() bool {
nodeTopology, err = topologyClient.TopologyV1alpha1().NodeResourceTopologies().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
framework.Logf("failed to get the node topology resource: %v", err)
return false
}
return true
}, time.Minute, 5*time.Second).Should(gomega.BeTrue())
return nodeTopology
}
// AllocatableResourceListFromNodeResourceTopology extract the map zone:allocatableResources from the given NodeResourceTopology instance.
func AllocatableResourceListFromNodeResourceTopology(nodeTopo *v1alpha1.NodeResourceTopology) map[string]v1.ResourceList {
allocRes := make(map[string]v1.ResourceList)
for _, zone := range nodeTopo.Zones {
if zone.Type != "Node" {
continue
}
resList := make(v1.ResourceList)
for _, res := range zone.Resources {
resList[v1.ResourceName(res.Name)] = res.Allocatable.DeepCopy()
}
if len(resList) == 0 {
continue
}
allocRes[zone.Name] = resList
}
return allocRes
}
// CompareAllocatableResources compares `expected` and `got` map zone:allocatableResources respectively (see: AllocatableResourceListFromNodeResourceTopology),
// and informs the caller if the maps are equal. Here `equal` means the same zoneNames with the same resources, where the resources are equal if they have
// the same resources with the same quantities. Returns the name of the different zone, the name of the different resources within the zone,
// the comparison result (same semantic as strings.Compare) and a boolean that reports if the resourceLists are consistent. See `CompareResourceList`.
func CompareAllocatableResources(expected, got map[string]v1.ResourceList) (string, string, int, bool) {
if len(got) != len(expected) {
framework.Logf("-> expected=%v (len=%d) got=%v (len=%d)", expected, len(expected), got, len(got))
return "", "", 0, false
}
for expZoneName, expResList := range expected {
gotResList, ok := got[expZoneName]
if !ok {
return expZoneName, "", 0, false
}
if resName, cmp, ok := CompareResourceList(expResList, gotResList); !ok || cmp != 0 {
return expZoneName, resName, cmp, ok
}
}
return "", "", 0, true
}
// CompareResourceList compares `expected` and `got` ResourceList respectively, and informs the caller if the two ResourceList
// are equal. Here `equal` means the same resources with the same quantities. Returns the different resource,
// the comparison result (same semantic as strings.Compare) and a boolean that reports if the resourceLists are consistent.
// The ResourceLists are consistent only if the represent the same resource set (all the resources listed in one are
// also present in the another; no ResourceList is a superset nor a subset of the other)
func CompareResourceList(expected, got v1.ResourceList) (string, int, bool) {
if len(got) != len(expected) {
framework.Logf("-> expected=%v (len=%d) got=%v (len=%d)", expected, len(expected), got, len(got))
return "", 0, false
}
for expResName, expResQty := range expected {
gotResQty, ok := got[expResName]
if !ok {
return string(expResName), 0, false
}
if cmp := gotResQty.Cmp(expResQty); cmp != 0 {
framework.Logf("-> resource=%q cmp=%d expected=%v got=%v", expResName, cmp, expResQty, gotResQty)
return string(expResName), cmp, true
}
}
return "", 0, true
}
// IsValidNodeTopology checks the provided NodeResourceTopology object if it is well-formad, internally consistent and
// consistent with the given kubelet config object. Returns true if the NodeResourceTopology object is consistent and well
// formet, false otherwise; if return false, logs the failure reason.
func IsValidNodeTopology(nodeTopology *v1alpha1.NodeResourceTopology, kubeletConfig *kubeletconfig.KubeletConfiguration) bool {
if nodeTopology == nil || len(nodeTopology.TopologyPolicies) == 0 {
framework.Logf("failed to get topology policy from the node topology resource")
return false
}
tmPolicy := string(topologypolicy.DetectTopologyPolicy(kubeletConfig.TopologyManagerPolicy, kubeletConfig.TopologyManagerScope))
if nodeTopology.TopologyPolicies[0] != tmPolicy {
framework.Logf("topology policy mismatch got %q expected %q", nodeTopology.TopologyPolicies[0], tmPolicy)
return false
}
if nodeTopology.Zones == nil || len(nodeTopology.Zones) == 0 {
framework.Logf("failed to get topology zones from the node topology resource")
return false
}
foundNodes := 0
for _, zone := range nodeTopology.Zones {
// TODO constant not in the APIs
if !strings.HasPrefix(strings.ToUpper(zone.Type), "NODE") {
continue
}
foundNodes++
if !isValidCostList(zone.Name, zone.Costs) {
framework.Logf("invalid cost list for zone %q", zone.Name)
return false
}
if !isValidResourceList(zone.Name, zone.Resources) {
framework.Logf("invalid resource list for zone %q", zone.Name)
return false
}
}
return foundNodes > 0
}
func isValidCostList(zoneName string, costs v1alpha1.CostList) bool {
if len(costs) == 0 {
framework.Logf("failed to get topology costs for zone %q from the node topology resource", zoneName)
return false
}
// TODO cross-validate zone names
for _, cost := range costs {
if cost.Name == "" || cost.Value < 0 {
framework.Logf("malformed cost %v for zone %q", cost, zoneName)
}
}
return true
}
func isValidResourceList(zoneName string, resources v1alpha1.ResourceInfoList) bool {
if len(resources) == 0 {
framework.Logf("failed to get topology resources for zone %q from the node topology resource", zoneName)
return false
}
foundCpu := false
for _, resource := range resources {
// TODO constant not in the APIs
if strings.ToUpper(resource.Name) == "CPU" {
foundCpu = true
}
allocatable, ok1 := resource.Allocatable.AsInt64()
capacity, ok2 := resource.Capacity.AsInt64()
if (!ok1 || !ok2) || ((allocatable < 0 || capacity < 0) || (capacity < allocatable)) {
framework.Logf("malformed resource %v for zone %q", resource, zoneName)
return false
}
}
return foundCpu
}

View file

@ -19,20 +19,98 @@ package utils
import (
"context"
"flag"
"sync"
"time"
"github.com/onsi/ginkgo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/util/podutils"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/utils/pointer"
)
var pullIfNotPresent = flag.Bool("nfd.pull-if-not-present", false, "Pull Images if not present - not always")
const (
PauseImage = "k8s.gcr.io/pause"
)
// GuarenteedSleeperPod makes a Guaranteed QoS class Pod object which long enough forever but requires `cpuLimit` exclusive CPUs.
func GuaranteedSleeperPod(cpuLimit string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sleeper-gu-pod",
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
v1.Container{
Name: "sleeper-gu-cnt",
Image: PauseImage,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
// we use 1 core because that's the minimal meaningful quantity
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpuLimit),
// any random reasonable amount is fine
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100Mi"),
},
},
},
},
},
}
}
// BestEffortSleeperPod makes a Best Effort QoS class Pod object which sleeps long enough
func BestEffortSleeperPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sleeper-be-pod",
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
v1.Container{
Name: "sleeper-be-cnt",
Image: PauseImage,
},
},
},
}
}
// DeletePodsAsync concurrently deletes all the pods in the given name:pod_object mapping. Returns when the longer operation ends.
func DeletePodsAsync(f *framework.Framework, podMap map[string]*v1.Pod) {
var wg sync.WaitGroup
for _, pod := range podMap {
wg.Add(1)
go func(podNS, podName string) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
DeletePodSyncByName(f, podName)
}(pod.Namespace, pod.Name)
}
wg.Wait()
}
// DeletePodSyncByName deletes the pod identified by `podName` in the current namespace
func DeletePodSyncByName(f *framework.Framework, podName string) {
gp := int64(0)
delOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gp,
}
f.PodClient().DeleteSync(podName, delOpts, framework.DefaultPodDeletionTimeout)
}
// NFDMasterPod provide NFD master pod definition
func NFDMasterPod(image string, onMasterNode bool) *v1.Pod {
p := &v1.Pod{
@ -97,6 +175,12 @@ func NFDWorkerDaemonSet(image string, extraArgs []string) *appsv1.DaemonSet {
return newDaemonSet("nfd-worker", podSpec)
}
// NFDTopologyUpdaterDaemonSet provides the NFD daemon set topology updater
func NFDTopologyUpdaterDaemonSet(kc KubeletConfig, image string, extraArgs []string) *appsv1.DaemonSet {
podSpec := nfdTopologyUpdaterPodSpec(kc, image, extraArgs)
return newDaemonSet("nfd-topology-updater", podSpec)
}
// newDaemonSet provide the new daemon set
func newDaemonSet(name string, podSpec *v1.PodSpec) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
@ -216,7 +300,89 @@ func nfdWorkerPodSpec(image string, extraArgs []string) *v1.PodSpec {
},
},
}
}
func nfdTopologyUpdaterPodSpec(kc KubeletConfig, image string, extraArgs []string) *v1.PodSpec {
return &v1.PodSpec{
Containers: []v1.Container{
{
Name: "node-topology-updater",
Image: image,
ImagePullPolicy: pullPolicy(),
Command: []string{"nfd-topology-updater"},
Args: append([]string{
"--kubelet-config-file=/podresources/config.yaml",
"--podresources-socket=unix:///podresources/kubelet.sock",
"--sleep-interval=3s",
"--watch-namespace=rte",
"--server=nfd-master-e2e:8080",
}, extraArgs...),
Env: []v1.EnvVar{
{
Name: "NODE_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
},
SecurityContext: &v1.SecurityContext{
Capabilities: &v1.Capabilities{
Drop: []v1.Capability{"ALL"},
},
RunAsUser: pointer.Int64Ptr(0),
ReadOnlyRootFilesystem: pointer.BoolPtr(true),
AllowPrivilegeEscalation: pointer.BoolPtr(false),
},
VolumeMounts: []v1.VolumeMount{
{
Name: "kubelet-podresources-conf",
MountPath: "/podresources/config.yaml",
},
{
Name: "kubelet-podresources-sock",
MountPath: "/podresources/kubelet.sock",
},
{
Name: "host-sys",
MountPath: "/host-sys",
},
},
},
},
ServiceAccountName: "nfd-topology-updater-e2e",
DNSPolicy: v1.DNSClusterFirstWithHostNet,
Volumes: []v1.Volume{
{
Name: "kubelet-podresources-conf",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: kc.ConfigPath,
Type: newHostPathType(v1.HostPathFile),
},
},
},
{
Name: "kubelet-podresources-sock",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: kc.PodResourcesSocketPath,
Type: newHostPathType(v1.HostPathSocket),
},
},
},
{
Name: "host-sys",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/sys",
Type: newHostPathType(v1.HostPathDirectory),
},
},
},
},
}
}
func newHostPathType(typ v1.HostPathType) *v1.HostPathType {

View file

@ -32,17 +32,32 @@ var (
// ConfigureRBAC creates required RBAC configuration
func ConfigureRBAC(cs clientset.Interface, ns string) error {
_, err := createServiceAccount(cs, ns)
_, err := createServiceAccountMaster(cs, ns)
if err != nil {
return err
}
_, err = createClusterRole(cs)
_, err = createServiceAccountTopologyUpdater(cs, ns)
if err != nil {
return err
}
_, err = createClusterRoleBinding(cs, ns)
_, err = createClusterRoleMaster(cs)
if err != nil {
return err
}
_, err = createClusterRoleTopologyUpdater(cs)
if err != nil {
return err
}
_, err = createClusterRoleBindingMaster(cs, ns)
if err != nil {
return err
}
_, err = createClusterRoleBindingTopologyUpdater(cs, ns)
if err != nil {
return err
}
@ -52,7 +67,15 @@ func ConfigureRBAC(cs clientset.Interface, ns string) error {
// DeconfigureRBAC removes RBAC configuration
func DeconfigureRBAC(cs clientset.Interface, ns string) error {
err := cs.RbacV1().ClusterRoleBindings().Delete(context.TODO(), "nfd-master-e2e", metav1.DeleteOptions{})
err := cs.RbacV1().ClusterRoleBindings().Delete(context.TODO(), "nfd-topology-updater-e2e", metav1.DeleteOptions{})
if err != nil {
return err
}
err = cs.RbacV1().ClusterRoleBindings().Delete(context.TODO(), "nfd-master-e2e", metav1.DeleteOptions{})
if err != nil {
return err
}
err = cs.RbacV1().ClusterRoles().Delete(context.TODO(), "nfd-topology-updater-e2e", metav1.DeleteOptions{})
if err != nil {
return err
}
@ -60,6 +83,10 @@ func DeconfigureRBAC(cs clientset.Interface, ns string) error {
if err != nil {
return err
}
err = cs.CoreV1().ServiceAccounts(ns).Delete(context.TODO(), "nfd-topology-updater-e2e", metav1.DeleteOptions{})
if err != nil {
return err
}
err = cs.CoreV1().ServiceAccounts(ns).Delete(context.TODO(), "nfd-master-e2e", metav1.DeleteOptions{})
if err != nil {
return err
@ -67,8 +94,8 @@ func DeconfigureRBAC(cs clientset.Interface, ns string) error {
return nil
}
// Configure service account required by NFD
func createServiceAccount(cs clientset.Interface, ns string) (*v1.ServiceAccount, error) {
// Configure service account required by NFD Master
func createServiceAccountMaster(cs clientset.Interface, ns string) (*v1.ServiceAccount, error) {
sa := &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "nfd-master-e2e",
@ -78,8 +105,19 @@ func createServiceAccount(cs clientset.Interface, ns string) (*v1.ServiceAccount
return cs.CoreV1().ServiceAccounts(ns).Create(context.TODO(), sa, metav1.CreateOptions{})
}
// Configure cluster role required by NFD
func createClusterRole(cs clientset.Interface) (*rbacv1.ClusterRole, error) {
// Configure service account required by NFD MTopology Updater
func createServiceAccountTopologyUpdater(cs clientset.Interface, ns string) (*v1.ServiceAccount, error) {
sa := &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "nfd-topology-updater-e2e",
Namespace: ns,
},
}
return cs.CoreV1().ServiceAccounts(ns).Create(context.TODO(), sa, metav1.CreateOptions{})
}
// Configure cluster role required by NFD Master
func createClusterRoleMaster(cs clientset.Interface) (*rbacv1.ClusterRole, error) {
cr := &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: "nfd-master-e2e",
@ -114,8 +152,38 @@ func createClusterRole(cs clientset.Interface) (*rbacv1.ClusterRole, error) {
return cs.RbacV1().ClusterRoles().Update(context.TODO(), cr, metav1.UpdateOptions{})
}
// Configure cluster role binding required by NFD
func createClusterRoleBinding(cs clientset.Interface, ns string) (*rbacv1.ClusterRoleBinding, error) {
// Configure cluster role required by NFD Topology Updater
func createClusterRoleTopologyUpdater(cs clientset.Interface) (*rbacv1.ClusterRole, error) {
cr := &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: "nfd-topology-updater-e2e",
},
// the Topology Updater doesn't need to access any kube object:
// it reads from the podresources socket and it sends updates to the
// nfd-master using the gRPC interface.
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get", "list", "watch"},
},
},
}
if *openShift {
cr.Rules = append(cr.Rules,
rbacv1.PolicyRule{
// needed on OpenShift clusters
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"hostaccess"},
Verbs: []string{"use"},
})
}
return cs.RbacV1().ClusterRoles().Update(context.TODO(), cr, metav1.UpdateOptions{})
}
// Configure cluster role binding required by NFD Master
func createClusterRoleBindingMaster(cs clientset.Interface, ns string) (*rbacv1.ClusterRoleBinding, error) {
crb := &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "nfd-master-e2e",
@ -136,3 +204,26 @@ func createClusterRoleBinding(cs clientset.Interface, ns string) (*rbacv1.Cluste
return cs.RbacV1().ClusterRoleBindings().Update(context.TODO(), crb, metav1.UpdateOptions{})
}
// Configure cluster role binding required by NFD Topology Updater
func createClusterRoleBindingTopologyUpdater(cs clientset.Interface, ns string) (*rbacv1.ClusterRoleBinding, error) {
crb := &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "nfd-topology-updater-e2e",
},
Subjects: []rbacv1.Subject{
{
Kind: rbacv1.ServiceAccountKind,
Name: "nfd-topology-updater-e2e",
Namespace: ns,
},
},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "ClusterRole",
Name: "nfd-topology-updater-e2e",
},
}
return cs.RbacV1().ClusterRoleBindings().Update(context.TODO(), crb, metav1.UpdateOptions{})
}