From 6de13fe45633589afa6c574f4acc2e9b9e9f1fd8 Mon Sep 17 00:00:00 2001 From: Talor Itzhak Date: Thu, 12 Jan 2023 14:50:32 +0200 Subject: [PATCH] e2e: reactive updates test Signed-off-by: Talor Itzhak --- test/e2e/topology_updater_test.go | 91 +++++++++++++++++++++++++++++-- test/e2e/utils/pod/pod.go | 18 +++++- 2 files changed, 101 insertions(+), 8 deletions(-) diff --git a/test/e2e/topology_updater_test.go b/test/e2e/topology_updater_test.go index 159bf3f38..005cac9dc 100644 --- a/test/e2e/topology_updater_test.go +++ b/test/e2e/topology_updater_test.go @@ -21,8 +21,6 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/api/resource" - "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -34,6 +32,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -115,8 +114,7 @@ var _ = SIGDescribe("NFD topology updater", func() { kcfg := cfg.GetKubeletConfig() By(fmt.Sprintf("Using config (%#v)", kcfg)) - - podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage())} + podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-sleep-interval=3s")} topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...) }) @@ -184,7 +182,7 @@ var _ = SIGDescribe("NFD topology updater", func() { cooldown := 30 * time.Second By(fmt.Sprintf("getting the updated topology - sleeping for %v", cooldown)) - // the object, hance the resource version must NOT change, so we can only sleep + // the object, hence the resource version must NOT change, so we can only sleep time.Sleep(cooldown) By("checking the changes in the updated topology - expecting none") finalNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name) @@ -263,7 +261,90 @@ var _ = SIGDescribe("NFD topology updater", func() { return true }, time.Minute, 5*time.Second).Should(BeTrue(), "didn't get updated node topology info") }) + }) + When("sleep interval disabled", func() { + ginkgo.BeforeEach(func() { + cfg, err := testutils.GetConfig() + Expect(err).ToNot(HaveOccurred()) + + kcfg := cfg.GetKubeletConfig() + By(fmt.Sprintf("Using config (%#v)", kcfg)) + podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-sleep-interval=0s")} + topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...) + }) + It("should still create CRs using a reactive updates", func() { + nodes, err := testutils.FilterNodesWithEnoughCores(workerNodes, "1000m") + Expect(err).NotTo(HaveOccurred()) + if len(nodes) < 1 { + Skip("not enough allocatable cores for this test") + } + + By("creating a pod consuming exclusive CPUs") + sleeperPod := testpod.GuaranteedSleeper(testpod.WithLimits( + corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + // any random reasonable amount is fine + corev1.ResourceMemory: resource.MustParse("100Mi"), + })) + // in case there is more than a single node in the cluster + // we need to set the node name, so we'll have certainty about + // which node we need to examine + sleeperPod.Spec.NodeName = topologyUpdaterNode.Name + + podMap := make(map[string]*corev1.Pod) + pod := e2epod.NewPodClient(f).CreateSync(sleeperPod) + podMap[pod.Name] = pod + defer testpod.DeleteAsync(f, podMap) + + By("checking initial CR created") + initialNodeTopo := testutils.GetNodeTopology(topologyClient, topologyUpdaterNode.Name) + + By("creating additional pod consuming exclusive CPUs") + sleeperPod2 := testpod.GuaranteedSleeper(testpod.WithLimits( + corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + // any random reasonable amount is fine + corev1.ResourceMemory: resource.MustParse("100Mi"), + })) + + // in case there is more than a single node in the cluster + // we need to set the node name, so we'll have certainty about + // which node we need to examine + sleeperPod2.Spec.NodeName = topologyUpdaterNode.Name + sleeperPod2.Name = sleeperPod2.Name + "2" + pod2 := e2epod.NewPodClient(f).CreateSync(sleeperPod2) + podMap[pod.Name] = pod2 + + By("checking the changes in the updated topology") + var finalNodeTopo *v1alpha2.NodeResourceTopology + Eventually(func() bool { + finalNodeTopo, err = topologyClient.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), topologyUpdaterNode.Name, metav1.GetOptions{}) + if err != nil { + framework.Logf("failed to get the node topology resource: %v", err) + return false + } + if finalNodeTopo.ObjectMeta.ResourceVersion == initialNodeTopo.ObjectMeta.ResourceVersion { + framework.Logf("node topology resource %s was not updated", topologyUpdaterNode.Name) + } + + initialAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(initialNodeTopo) + finalAllocRes := testutils.AllocatableResourceListFromNodeResourceTopology(finalNodeTopo) + if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 { + Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes)) + } + + zoneName, resName, isLess := lessAllocatableResources(initialAllocRes, finalAllocRes) + framework.Logf("zone=%q resource=%q isLess=%v", zoneName, resName, isLess) + if !isLess { + framework.Logf("final allocatable resources not decreased - initial=%v final=%v", initialAllocRes, finalAllocRes) + } + return true + // timeout must be lower than sleep interval + // otherwise we won't be able to determine what + // triggered the CR update + }, time.Second*20, 5*time.Second).Should(BeTrue(), "didn't get updated node topology info") + }) }) When("topology-updater configure to exclude memory", func() { diff --git a/test/e2e/utils/pod/pod.go b/test/e2e/utils/pod/pod.go index ec719f331..965c78763 100644 --- a/test/e2e/utils/pod/pod.go +++ b/test/e2e/utils/pod/pod.go @@ -364,8 +364,7 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1. Command: []string{"nfd-topology-updater"}, Args: []string{ "-kubelet-config-uri=file:///podresources/config.yaml", - "-podresources-socket=unix:///podresources/kubelet.sock", - "-sleep-interval=3s", + "-podresources-socket=unix:///host-var/lib/kubelet/pod-resources/kubelet.sock", "-watch-namespace=rte"}, Env: []corev1.EnvVar{ { @@ -389,13 +388,17 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1. }, }, VolumeMounts: []corev1.VolumeMount{ + { + Name: "kubelet-state-files", + MountPath: "/host-var/lib/kubelet", + }, { Name: "kubelet-podresources-conf", MountPath: "/podresources/config.yaml", }, { Name: "kubelet-podresources-sock", - MountPath: "/podresources/kubelet.sock", + MountPath: "/host-var/lib/kubelet/pod-resources/kubelet.sock", }, { Name: "host-sys", @@ -407,6 +410,15 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1. ServiceAccountName: "nfd-topology-updater-e2e", DNSPolicy: corev1.DNSClusterFirstWithHostNet, Volumes: []corev1.Volume{ + { + Name: "kubelet-state-files", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/var/lib/kubelet", + Type: newHostPathType(corev1.HostPathDirectory), + }, + }, + }, { Name: "kubelet-podresources-conf", VolumeSource: corev1.VolumeSource{