1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-31 04:04:51 +00:00
This commit is contained in:
AllenXu93 2025-03-28 17:36:29 +08:00 committed by GitHub
commit 1246f52dff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 188 additions and 50 deletions

View file

@ -19,13 +19,12 @@ package resourcemonitor
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"strconv"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
client "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
@ -57,58 +56,49 @@ func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.
}
// isWatchable tells if the the given namespace should be watched.
func (resMon *PodResourcesScanner) isWatchable(podNamespace string, podName string, hasDevice bool) (bool, bool, error) {
pod, err := resMon.k8sClient.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
// In Scan(), if watchable is false, this pods scan will skip
// so we can return directly if pod's namespace is not watchable
func (resMon *PodResourcesScanner) isWatchable(podResource *podresourcesapi.PodResources) (bool, bool, error) {
if resMon.namespace != "*" && resMon.namespace != podResource.Namespace {
return false, false, nil
}
pod, err := resMon.k8sClient.CoreV1().Pods(podResource.Namespace).Get(context.TODO(), podResource.Name, metav1.GetOptions{})
if err != nil {
return false, false, err
}
isIntegralGuaranteed := hasExclusiveCPUs(pod)
isPodHasIntegralCPUs := podHasIntegralCPUs(pod)
isPodGuaranteed := qos.GetPodQOS(pod) == corev1.PodQOSGuaranteed
if resMon.namespace == "*" && (isIntegralGuaranteed || hasDevice) {
return true, isIntegralGuaranteed, nil
}
// TODO: add an explicit check for guaranteed pods and pods with devices
return resMon.namespace == podNamespace && (isIntegralGuaranteed || hasDevice), isIntegralGuaranteed, nil
return isPodGuaranteed || hasDevice(podResource), isPodHasIntegralCPUs, nil
}
// hasExclusiveCPUs returns true if a guaranteed pod is allocated exclusive CPUs else returns false.
// podHasIntegralCPUs returns true if a guaranteed pod is allocated exclusive CPUs else returns false.
// In isWatchable() function we check for the pod QoS and proceed if it is guaranteed (i.e. request == limit)
// and hence we only check for request in the function below.
func hasExclusiveCPUs(pod *corev1.Pod) bool {
var totalCPU int64
var cpuQuantity resource.Quantity
func podHasIntegralCPUs(pod *corev1.Pod) bool {
for _, container := range pod.Spec.InitContainers {
var ok bool
if cpuQuantity, ok = container.Resources.Requests[corev1.ResourceCPU]; !ok {
continue
}
totalCPU += cpuQuantity.Value()
isInitContainerGuaranteed := hasIntegralCPUs(pod, &container)
if !isInitContainerGuaranteed {
return false
if hasIntegralCPUs(&container) {
return true
}
}
for _, container := range pod.Spec.Containers {
var ok bool
if cpuQuantity, ok = container.Resources.Requests[corev1.ResourceCPU]; !ok {
continue
}
totalCPU += cpuQuantity.Value()
isAppContainerGuaranteed := hasIntegralCPUs(pod, &container)
if !isAppContainerGuaranteed {
return false
if hasIntegralCPUs(&container) {
return true
}
}
//No CPUs requested in all the containers in the pod
return totalCPU != 0
//No integralCPUs requested in all the containers of the pod
return false
}
// hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false
func hasIntegralCPUs(pod *corev1.Pod, container *corev1.Container) bool {
cpuQuantity := container.Resources.Requests[corev1.ResourceCPU]
func hasIntegralCPUs(container *corev1.Container) bool {
cpuQuantity, ok := container.Resources.Requests[corev1.ResourceCPU]
if !ok {
return false
}
return cpuQuantity.Value()*1000 == cpuQuantity.MilliValue()
}
@ -146,8 +136,7 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) {
for _, podResource := range respPodResources {
klog.InfoS("scanning pod", "podName", podResource.GetName())
hasDevice := hasDevice(podResource)
isWatchable, isIntegralGuaranteed, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice)
isWatchable, isExclusiveCPUs, err := resMon.isWatchable(podResource)
if err != nil {
return ScanResponse{}, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %w", podResource.GetNamespace(), podResource.GetName(), err)
}
@ -165,19 +154,17 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) {
Name: container.Name,
}
if isIntegralGuaranteed {
cpuIDs := container.GetCpuIds()
if len(cpuIDs) > 0 {
var resCPUs []string
for _, cpuID := range container.GetCpuIds() {
resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10))
}
contRes.Resources = []ResourceInfo{
{
Name: corev1.ResourceCPU,
Data: resCPUs,
},
}
cpuIDs := container.GetCpuIds()
if len(cpuIDs) > 0 && isExclusiveCPUs {
var resCPUs []string
for _, cpuID := range container.GetCpuIds() {
resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10))
}
contRes.Resources = []ResourceInfo{
{
Name: corev1.ResourceCPU,
Data: resCPUs,
},
}
}

View file

@ -165,6 +165,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli := fakeclient.NewSimpleClientset(pod)
@ -280,6 +283,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
@ -368,6 +374,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
@ -458,6 +467,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
@ -628,6 +640,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
@ -825,6 +840,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
@ -1029,6 +1047,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
@ -1113,6 +1134,9 @@ func TestPodScanner(t *testing.T) {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
@ -1145,5 +1169,129 @@ func TestPodScanner(t *testing.T) {
So(reflect.DeepEqual(res.PodResources, expected), ShouldBeTrue)
})
Convey("When I successfully get valid response for guaranteed pods with not cpu pin containers", func() {
resp := &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{
{
Name: "test-pod-0",
Namespace: "pod-res-test",
Containers: []*v1.ContainerResources{
{
Name: "test-cnt-0",
CpuIds: []int64{0, 1},
Devices: []*v1.ContainerDevices{
{
ResourceName: "fake.io/resource",
DeviceIds: []string{"devA"},
},
},
},
{
Name: "test-cnt-1",
Devices: []*v1.ContainerDevices{
{
ResourceName: "fake.io/resource",
DeviceIds: []string{"devA"},
},
},
},
},
},
},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-0",
Namespace: "pod-res-test",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-cnt-0",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceCPU: resource.MustParse("2"),
},
Limits: corev1.ResourceList{
corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Name: "test-cnt-1",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceCPU: resource.MustParse("1500m"),
},
Limits: corev1.ResourceList{
corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceCPU: resource.MustParse("1500m"),
},
},
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
fakeCli = fakeclient.NewSimpleClientset(pod)
resScan.(*PodResourcesScanner).k8sClient = fakeCli
res, err := resScan.Scan()
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Convey("Return PodResources should have values", func() {
So(len(res.PodResources), ShouldBeGreaterThan, 0)
})
expected := []PodResources{
{
Name: "test-pod-0",
Namespace: "pod-res-test",
Containers: []ContainerResources{
{
Name: "test-cnt-0",
Resources: []ResourceInfo{
{
Name: corev1.ResourceCPU,
Data: []string{"0", "1"},
},
{
Name: "fake.io/resource",
Data: []string{"devA"},
},
},
},
{
Name: "test-cnt-1",
Resources: []ResourceInfo{
{
Name: "fake.io/resource",
Data: []string{"devA"},
},
},
},
},
},
}
So(reflect.DeepEqual(res.PodResources, expected), ShouldBeTrue)
})
})
}

View file

@ -61,6 +61,9 @@ func GuaranteedSleeper(opts ...func(pod *corev1.Pod)) *corev1.Pod {
},
},
},
Status: corev1.PodStatus{
QOSClass: corev1.PodQOSGuaranteed,
},
}
for _, o := range opts {
o(p)