2021-05-13 10:55:33 +00:00
/ *
Copyright 2021 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package resourcemonitor
import (
"context"
"fmt"
2021-07-14 00:23:19 +00:00
"strconv"
2021-05-13 10:55:33 +00:00
2022-10-14 12:28:52 +00:00
corev1 "k8s.io/api/core/v1"
2021-07-14 00:23:19 +00:00
"k8s.io/apimachinery/pkg/api/resource"
2024-01-22 13:24:16 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
client "k8s.io/client-go/kubernetes"
2021-05-13 10:55:33 +00:00
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
2021-07-13 23:50:05 +00:00
2023-02-01 11:41:09 +00:00
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"github.com/k8stopologyawareschedwg/podfingerprint"
2021-05-13 10:55:33 +00:00
)
type PodResourcesScanner struct {
namespace string
podResourceClient podresourcesapi . PodResourcesListerClient
2024-01-22 13:24:16 +00:00
k8sClient client . Interface
2023-02-01 11:41:09 +00:00
podFingerprint bool
2021-05-13 10:55:33 +00:00
}
2022-03-03 01:12:46 +00:00
// NewPodResourcesScanner creates a new ResourcesScanner instance
2024-01-22 13:24:16 +00:00
func NewPodResourcesScanner ( namespace string , podResourceClient podresourcesapi . PodResourcesListerClient , k8sClient client . Interface , podFingerprint bool ) ( ResourcesScanner , error ) {
2021-05-13 10:55:33 +00:00
resourcemonitorInstance := & PodResourcesScanner {
namespace : namespace ,
podResourceClient : podResourceClient ,
2024-01-22 13:24:16 +00:00
k8sClient : k8sClient ,
2023-02-01 11:41:09 +00:00
podFingerprint : podFingerprint ,
2021-05-13 10:55:33 +00:00
}
if resourcemonitorInstance . namespace != "*" {
2023-05-03 08:32:53 +00:00
klog . InfoS ( "watching one namespace" , "namespace" , resourcemonitorInstance . namespace )
2021-05-13 10:55:33 +00:00
} else {
2023-05-03 08:32:53 +00:00
klog . InfoS ( "watching all namespaces" )
2021-05-13 10:55:33 +00:00
}
return resourcemonitorInstance , nil
}
// isWatchable tells if the the given namespace should be watched.
2021-07-14 00:23:19 +00:00
func ( resMon * PodResourcesScanner ) isWatchable ( podNamespace string , podName string , hasDevice bool ) ( bool , bool , error ) {
2024-01-22 13:24:16 +00:00
pod , err := resMon . k8sClient . CoreV1 ( ) . Pods ( podNamespace ) . Get ( context . TODO ( ) , podName , metav1 . GetOptions { } )
2021-07-13 23:50:05 +00:00
if err != nil {
2021-07-14 00:23:19 +00:00
return false , false , err
2021-07-13 23:50:05 +00:00
}
2021-07-14 00:23:19 +00:00
isIntegralGuaranteed := hasExclusiveCPUs ( pod )
2021-07-13 23:50:05 +00:00
2021-07-14 00:23:19 +00:00
if resMon . namespace == "*" && ( isIntegralGuaranteed || hasDevice ) {
return true , isIntegralGuaranteed , nil
2021-05-13 10:55:33 +00:00
}
2021-07-14 00:23:19 +00:00
// TODO: add an explicit check for guaranteed pods and pods with devices
return resMon . namespace == podNamespace && ( isIntegralGuaranteed || hasDevice ) , isIntegralGuaranteed , nil
2021-07-13 23:50:05 +00:00
}
2021-11-05 14:42:49 +00:00
// hasExclusiveCPUs returns true if a guaranteed pod is allocated exclusive CPUs else returns false.
2021-07-13 23:50:05 +00:00
// In isWatchable() function we check for the pod QoS and proceed if it is guaranteed (i.e. request == limit)
// and hence we only check for request in the function below.
2022-10-14 12:28:52 +00:00
func hasExclusiveCPUs ( pod * corev1 . Pod ) bool {
2021-07-14 00:23:19 +00:00
var totalCPU int64
var cpuQuantity resource . Quantity
2021-07-13 23:50:05 +00:00
for _ , container := range pod . Spec . InitContainers {
2021-07-14 00:23:19 +00:00
var ok bool
2022-10-14 12:28:52 +00:00
if cpuQuantity , ok = container . Resources . Requests [ corev1 . ResourceCPU ] ; ! ok {
2021-07-13 23:50:05 +00:00
continue
}
2021-07-14 00:23:19 +00:00
totalCPU += cpuQuantity . Value ( )
2021-07-13 23:50:05 +00:00
isInitContainerGuaranteed := hasIntegralCPUs ( pod , & container )
if ! isInitContainerGuaranteed {
return false
}
}
for _ , container := range pod . Spec . Containers {
2021-07-14 00:23:19 +00:00
var ok bool
2022-10-14 12:28:52 +00:00
if cpuQuantity , ok = container . Resources . Requests [ corev1 . ResourceCPU ] ; ! ok {
2021-07-13 23:50:05 +00:00
continue
}
2021-07-14 00:23:19 +00:00
totalCPU += cpuQuantity . Value ( )
2021-07-13 23:50:05 +00:00
isAppContainerGuaranteed := hasIntegralCPUs ( pod , & container )
if ! isAppContainerGuaranteed {
return false
}
}
2021-07-14 00:23:19 +00:00
//No CPUs requested in all the containers in the pod
return totalCPU != 0
2021-07-13 23:50:05 +00:00
}
// hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false
2022-10-14 12:28:52 +00:00
func hasIntegralCPUs ( pod * corev1 . Pod , container * corev1 . Container ) bool {
cpuQuantity := container . Resources . Requests [ corev1 . ResourceCPU ]
2021-07-13 23:50:05 +00:00
return cpuQuantity . Value ( ) * 1000 == cpuQuantity . MilliValue ( )
2021-05-13 10:55:33 +00:00
}
// Scan gathers all the PodResources from the system, using the podresources API client.
2023-02-07 11:13:29 +00:00
func ( resMon * PodResourcesScanner ) Scan ( ) ( ScanResponse , error ) {
2021-05-13 10:55:33 +00:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , defaultPodResourcesTimeout )
defer cancel ( )
// Pod Resource API client
resp , err := resMon . podResourceClient . List ( ctx , & podresourcesapi . ListPodResourcesRequest { } )
if err != nil {
2023-02-07 11:13:29 +00:00
return ScanResponse { } , fmt . Errorf ( "can't receive response: %v.Get(_) = _, %w" , resMon . podResourceClient , err )
2021-05-13 10:55:33 +00:00
}
2023-02-01 11:41:09 +00:00
respPodResources := resp . GetPodResources ( )
retVal := ScanResponse {
Attributes : v1alpha2 . AttributeList { } ,
}
if resMon . podFingerprint && len ( respPodResources ) > 0 {
var status podfingerprint . Status
podFingerprintSign , err := computePodFingerprint ( respPodResources , & status )
if err != nil {
2023-05-03 08:32:53 +00:00
klog . ErrorS ( err , "failed to calculate fingerprint" )
2023-02-01 11:41:09 +00:00
} else {
2023-05-03 08:32:53 +00:00
klog . InfoS ( "podFingerprint calculated" , "status" , status . Repr ( ) )
2023-02-01 11:41:09 +00:00
retVal . Attributes = append ( retVal . Attributes , v1alpha2 . AttributeInfo {
Name : podfingerprint . Attribute ,
Value : podFingerprintSign ,
} )
}
}
2021-05-13 10:55:33 +00:00
var podResData [ ] PodResources
2023-02-01 11:41:09 +00:00
for _ , podResource := range respPodResources {
2023-05-03 08:32:53 +00:00
klog . InfoS ( "scanning pod" , "podName" , podResource . GetName ( ) )
2021-07-14 00:23:19 +00:00
hasDevice := hasDevice ( podResource )
isWatchable , isIntegralGuaranteed , err := resMon . isWatchable ( podResource . GetNamespace ( ) , podResource . GetName ( ) , hasDevice )
2021-07-13 23:50:05 +00:00
if err != nil {
2024-01-22 20:45:15 +00:00
return ScanResponse { } , fmt . Errorf ( "checking if pod in a namespace is watchable, namespace:%v, pod name %v: %w" , podResource . GetNamespace ( ) , podResource . GetName ( ) , err )
2021-07-13 23:50:05 +00:00
}
if ! isWatchable {
2021-05-13 10:55:33 +00:00
continue
}
podRes := PodResources {
Name : podResource . GetName ( ) ,
Namespace : podResource . GetNamespace ( ) ,
}
for _ , container := range podResource . GetContainers ( ) {
contRes := ContainerResources {
Name : container . Name ,
}
2021-07-14 00:23:19 +00:00
if isIntegralGuaranteed {
cpuIDs := container . GetCpuIds ( )
if len ( cpuIDs ) > 0 {
var resCPUs [ ] string
for _ , cpuID := range container . GetCpuIds ( ) {
resCPUs = append ( resCPUs , strconv . FormatInt ( cpuID , 10 ) )
}
contRes . Resources = [ ] ResourceInfo {
{
2022-10-14 12:28:52 +00:00
Name : corev1 . ResourceCPU ,
2021-07-14 00:23:19 +00:00
Data : resCPUs ,
} ,
}
2021-05-13 10:55:33 +00:00
}
}
for _ , device := range container . GetDevices ( ) {
2021-11-04 08:16:57 +00:00
numaNodesIDs := getNumaNodeIds ( device . GetTopology ( ) )
2021-05-13 10:55:33 +00:00
contRes . Resources = append ( contRes . Resources , ResourceInfo {
2022-10-14 12:28:52 +00:00
Name : corev1 . ResourceName ( device . ResourceName ) ,
2021-11-04 08:16:57 +00:00
Data : device . DeviceIds ,
NumaNodeIds : numaNodesIDs ,
} )
}
for _ , block := range container . GetMemory ( ) {
if block . GetSize_ ( ) == 0 {
continue
}
topology := getNumaNodeIds ( block . GetTopology ( ) )
contRes . Resources = append ( contRes . Resources , ResourceInfo {
2022-10-14 12:28:52 +00:00
Name : corev1 . ResourceName ( block . MemoryType ) ,
2021-11-04 08:16:57 +00:00
Data : [ ] string { fmt . Sprintf ( "%d" , block . GetSize_ ( ) ) } ,
NumaNodeIds : topology ,
2021-05-13 10:55:33 +00:00
} )
}
if len ( contRes . Resources ) == 0 {
continue
}
podRes . Containers = append ( podRes . Containers , contRes )
}
if len ( podRes . Containers ) == 0 {
continue
}
podResData = append ( podResData , podRes )
}
2023-02-01 11:41:09 +00:00
retVal . PodResources = podResData
return retVal , nil
2021-05-13 10:55:33 +00:00
}
2021-07-14 00:23:19 +00:00
func hasDevice ( podResource * podresourcesapi . PodResources ) bool {
for _ , container := range podResource . GetContainers ( ) {
if len ( container . GetDevices ( ) ) > 0 {
return true
}
}
2023-05-03 08:32:53 +00:00
klog . InfoS ( "pod doesn't have devices" , "podName" , podResource . GetName ( ) )
2021-07-14 00:23:19 +00:00
return false
}
2021-11-04 08:16:57 +00:00
func getNumaNodeIds ( topologyInfo * podresourcesapi . TopologyInfo ) [ ] int {
if topologyInfo == nil {
return nil
}
var topology [ ] int
for _ , node := range topologyInfo . Nodes {
if node != nil {
topology = append ( topology , int ( node . ID ) )
}
}
return topology
}
2023-02-01 11:41:09 +00:00
func computePodFingerprint ( podResources [ ] * podresourcesapi . PodResources , status * podfingerprint . Status ) ( string , error ) {
fingerprint := podfingerprint . NewTracingFingerprint ( len ( podResources ) , status )
for _ , podResource := range podResources {
err := fingerprint . Add ( podResource . Namespace , podResource . Name )
if err != nil {
return "" , err
}
}
return fingerprint . Sign ( ) , nil
}