mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2024-12-14 11:57:51 +00:00
Merge pull request #1813 from marquiz/devel/gc-metalister
nfd-gc: only fetch object metadata
This commit is contained in:
commit
57f1b79856
2 changed files with 82 additions and 78 deletions
|
@ -21,23 +21,28 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
|
||||
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
metadataclient "k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/metadata/metadatainformer"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
nfdclientset "sigs.k8s.io/node-feature-discovery/api/generated/clientset/versioned"
|
||||
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/utils"
|
||||
"sigs.k8s.io/node-feature-discovery/pkg/version"
|
||||
)
|
||||
|
||||
var (
|
||||
gvrNF = nfdv1alpha1.SchemeGroupVersion.WithResource("nodefeatures")
|
||||
gvrNRT = topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
|
||||
gvrNode = corev1.SchemeGroupVersion.WithResource("nodes")
|
||||
)
|
||||
|
||||
// Args are the command line arguments
|
||||
type Args struct {
|
||||
GCPeriod time.Duration
|
||||
|
@ -51,11 +56,10 @@ type NfdGarbageCollector interface {
|
|||
}
|
||||
|
||||
type nfdGarbageCollector struct {
|
||||
args *Args
|
||||
stopChan chan struct{}
|
||||
nfdClient nfdclientset.Interface
|
||||
topoClient topologyclientset.Interface
|
||||
factory informers.SharedInformerFactory
|
||||
args *Args
|
||||
stopChan chan struct{}
|
||||
client metadataclient.Interface
|
||||
factory metadatainformer.SharedInformerFactory
|
||||
}
|
||||
|
||||
func New(args *Args) (NfdGarbageCollector, error) {
|
||||
|
@ -64,20 +68,19 @@ func New(args *Args) (NfdGarbageCollector, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
clientset := kubernetes.NewForConfigOrDie(kubeconfig)
|
||||
cli := metadataclient.NewForConfigOrDie(kubeconfig)
|
||||
|
||||
return &nfdGarbageCollector{
|
||||
args: args,
|
||||
stopChan: make(chan struct{}),
|
||||
topoClient: topologyclientset.NewForConfigOrDie(kubeconfig),
|
||||
nfdClient: nfdclientset.NewForConfigOrDie(kubeconfig),
|
||||
factory: informers.NewSharedInformerFactory(clientset, 5*time.Minute),
|
||||
args: args,
|
||||
stopChan: make(chan struct{}),
|
||||
client: cli,
|
||||
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *nfdGarbageCollector) deleteNodeFeature(namespace, name string) {
|
||||
kind := "NodeFeature"
|
||||
if err := n.nfdClient.NfdV1alpha1().NodeFeatures(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
|
||||
if err := n.client.Resource(gvrNF).Namespace(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("NodeFeature not found, omitting deletion", "nodefeature", klog.KRef(namespace, name))
|
||||
return
|
||||
|
@ -93,7 +96,7 @@ func (n *nfdGarbageCollector) deleteNodeFeature(namespace, name string) {
|
|||
|
||||
func (n *nfdGarbageCollector) deleteNRT(nodeName string) {
|
||||
kind := "NodeResourceTopology"
|
||||
if err := n.topoClient.TopologyV1alpha2().NodeResourceTopologies().Delete(context.TODO(), nodeName, metav1.DeleteOptions{}); err != nil {
|
||||
if err := n.client.Resource(gvrNRT).Delete(context.TODO(), nodeName, metav1.DeleteOptions{}); err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("NodeResourceTopology not found, omitting deletion", "nodeName", nodeName)
|
||||
return
|
||||
|
@ -115,17 +118,18 @@ func (n *nfdGarbageCollector) deleteNodeHandler(object interface{}) {
|
|||
obj = deletedFinalStateUnknown.Obj
|
||||
}
|
||||
|
||||
node, ok := obj.(*corev1.Node)
|
||||
meta, ok := obj.(*metav1.PartialObjectMetadata)
|
||||
if !ok {
|
||||
klog.InfoS("cannot convert object to v1.Node", "object", object)
|
||||
klog.InfoS("cannot convert object to metav1.ObjectMeta", "object", object)
|
||||
return
|
||||
}
|
||||
nodeName := meta.ObjectMeta.GetName()
|
||||
|
||||
n.deleteNRT(node.GetName())
|
||||
n.deleteNRT(nodeName)
|
||||
|
||||
// Delete all NodeFeature objects (from all namespaces) targeting the deleted node
|
||||
nfListOptions := metav1.ListOptions{LabelSelector: nfdv1alpha1.NodeFeatureObjNodeNameLabel + "=" + node.GetName()}
|
||||
if nfs, err := n.nfdClient.NfdV1alpha1().NodeFeatures("").List(context.TODO(), nfListOptions); err != nil {
|
||||
nfListOptions := metav1.ListOptions{LabelSelector: nfdv1alpha1.NodeFeatureObjNodeNameLabel + "=" + nodeName}
|
||||
if nfs, err := n.client.Resource(gvrNF).List(context.TODO(), nfListOptions); err != nil {
|
||||
klog.ErrorS(err, "failed to list NodeFeature objects")
|
||||
} else {
|
||||
for _, nf := range nfs.Items {
|
||||
|
@ -137,24 +141,25 @@ func (n *nfdGarbageCollector) deleteNodeHandler(object interface{}) {
|
|||
// garbageCollect removes all stale API objects
|
||||
func (n *nfdGarbageCollector) garbageCollect() {
|
||||
klog.InfoS("performing garbage collection")
|
||||
nodes, err := n.factory.Core().V1().Nodes().Lister().List(labels.Everything())
|
||||
objs, err := n.factory.ForResource(gvrNode).Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to list Node objects")
|
||||
return
|
||||
}
|
||||
nodeNames := sets.NewString()
|
||||
for _, node := range nodes {
|
||||
nodeNames.Insert(node.Name)
|
||||
for _, obj := range objs {
|
||||
meta := obj.(*metav1.PartialObjectMetadata).ObjectMeta
|
||||
nodeNames.Insert(meta.Name)
|
||||
}
|
||||
|
||||
// Handle NodeFeature objects
|
||||
nfs, err := n.nfdClient.NfdV1alpha1().NodeFeatures("").List(context.TODO(), metav1.ListOptions{})
|
||||
objMetas, err := n.client.Resource(gvrNF).List(context.TODO(), metav1.ListOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("NodeFeature CRD does not exist")
|
||||
} else if err != nil {
|
||||
klog.ErrorS(err, "failed to list NodeFeature objects")
|
||||
} else {
|
||||
for _, nf := range nfs.Items {
|
||||
for _, nf := range objMetas.Items {
|
||||
nodeName, ok := nf.GetLabels()[nfdv1alpha1.NodeFeatureObjNodeNameLabel]
|
||||
if !ok {
|
||||
klog.InfoS("node name label missing from NodeFeature object", "nodefeature", klog.KObj(&nf))
|
||||
|
@ -166,13 +171,13 @@ func (n *nfdGarbageCollector) garbageCollect() {
|
|||
}
|
||||
|
||||
// Handle NodeResourceTopology objects
|
||||
nrts, err := n.topoClient.TopologyV1alpha2().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
|
||||
objMetas, err = n.client.Resource(gvrNRT).List(context.TODO(), metav1.ListOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("NodeResourceTopology CRD does not exist")
|
||||
} else if err != nil {
|
||||
klog.ErrorS(err, "failed to list NodeResourceTopology objects")
|
||||
} else {
|
||||
for _, nrt := range nrts.Items {
|
||||
for _, nrt := range objMetas.Items {
|
||||
if !nodeNames.Has(nrt.Name) {
|
||||
n.deleteNRT(nrt.Name)
|
||||
}
|
||||
|
@ -199,7 +204,7 @@ func (n *nfdGarbageCollector) periodicGC(gcPeriod time.Duration) {
|
|||
}
|
||||
|
||||
func (n *nfdGarbageCollector) startNodeInformer() error {
|
||||
nodeInformer := n.factory.Core().V1().Nodes().Informer()
|
||||
nodeInformer := n.factory.ForResource(gvrNode).Informer()
|
||||
|
||||
if _, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: n.deleteNodeHandler,
|
||||
|
|
|
@ -21,17 +21,16 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
|
||||
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
|
||||
faketopologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/fake"
|
||||
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
k8sclientset "k8s.io/client-go/kubernetes"
|
||||
fakek8sclientset "k8s.io/client-go/kubernetes/fake"
|
||||
fakenfdclientset "sigs.k8s.io/node-feature-discovery/api/generated/clientset/versioned/fake"
|
||||
metadataclient "k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/metadata/fake"
|
||||
fakemetadataclient "k8s.io/client-go/metadata/fake"
|
||||
"k8s.io/client-go/metadata/metadatainformer"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
@ -43,7 +42,7 @@ func TestNRTGC(t *testing.T) {
|
|||
errChan := make(chan error)
|
||||
go func() { errChan <- gc.Run() }()
|
||||
|
||||
So(waitForNRT(gc.topoClient), ShouldBeTrue)
|
||||
So(waitForNRT(gc.client), ShouldBeTrue)
|
||||
|
||||
gc.Stop()
|
||||
So(<-errChan, ShouldBeNil)
|
||||
|
@ -54,7 +53,7 @@ func TestNRTGC(t *testing.T) {
|
|||
errChan := make(chan error)
|
||||
go func() { errChan <- gc.Run() }()
|
||||
|
||||
So(waitForNRT(gc.topoClient, "node1"), ShouldBeTrue)
|
||||
So(waitForNRT(gc.client, "node1"), ShouldBeTrue)
|
||||
|
||||
gc.Stop()
|
||||
So(<-errChan, ShouldBeNil)
|
||||
|
@ -65,85 +64,85 @@ func TestNRTGC(t *testing.T) {
|
|||
errChan := make(chan error)
|
||||
go func() { errChan <- gc.Run() }()
|
||||
|
||||
err := gc.k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{})
|
||||
gvr := corev1.SchemeGroupVersion.WithResource("nodes")
|
||||
err := gc.client.Resource(gvr).Delete(context.TODO(), "node1", metav1.DeleteOptions{})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(waitForNRT(gc.topoClient, "node2"), ShouldBeTrue)
|
||||
So(waitForNRT(gc.client, "node2"), ShouldBeTrue)
|
||||
})
|
||||
Convey("periodic GC should remove obsolete NRT", t, func() {
|
||||
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})
|
||||
// Override period to run fast
|
||||
gc.args.GCPeriod = 100 * time.Millisecond
|
||||
|
||||
nrt := v1alpha2.NodeResourceTopology{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "not-existing",
|
||||
},
|
||||
}
|
||||
nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", "not-existing")
|
||||
|
||||
errChan := make(chan error)
|
||||
go func() { errChan <- gc.Run() }()
|
||||
|
||||
_, err := gc.topoClient.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrt, metav1.CreateOptions{})
|
||||
gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
|
||||
_, err := gc.client.Resource(gvr).(fake.MetadataClient).CreateFake(nrt, metav1.CreateOptions{})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(waitForNRT(gc.topoClient, "node1", "node2"), ShouldBeTrue)
|
||||
So(waitForNRT(gc.client, "node1", "node2"), ShouldBeTrue)
|
||||
})
|
||||
}
|
||||
|
||||
func newMockGC(nodes, nrts []string) *mockGC {
|
||||
k8sClient := fakek8sclientset.NewSimpleClientset(createFakeNodes(nodes...)...)
|
||||
// Create fake objects
|
||||
objs := []runtime.Object{}
|
||||
for _, name := range nodes {
|
||||
objs = append(objs, createPartialObjectMetadata("v1", "Node", "", name))
|
||||
}
|
||||
for _, name := range nrts {
|
||||
objs = append(objs, createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name))
|
||||
}
|
||||
|
||||
scheme := fake.NewTestScheme()
|
||||
_ = metav1.AddMetaToScheme(scheme)
|
||||
cli := fakemetadataclient.NewSimpleMetadataClient(scheme, objs...)
|
||||
return &mockGC{
|
||||
nfdGarbageCollector: nfdGarbageCollector{
|
||||
factory: informers.NewSharedInformerFactory(k8sClient, 5*time.Minute),
|
||||
nfdClient: fakenfdclientset.NewSimpleClientset(),
|
||||
topoClient: faketopologyv1alpha2.NewSimpleClientset(createFakeNRTs(nrts...)...),
|
||||
stopChan: make(chan struct{}),
|
||||
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
|
||||
client: cli,
|
||||
stopChan: make(chan struct{}),
|
||||
args: &Args{
|
||||
GCPeriod: 10 * time.Minute,
|
||||
},
|
||||
},
|
||||
k8sClient: k8sClient,
|
||||
client: cli,
|
||||
}
|
||||
}
|
||||
|
||||
func createFakeNodes(names ...string) []runtime.Object {
|
||||
nodes := make([]runtime.Object, len(names))
|
||||
for i, n := range names {
|
||||
nodes[i] = &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: n,
|
||||
}}
|
||||
func createPartialObjectMetadata(apiVersion, kind, namespace, name string) *metav1.PartialObjectMetadata {
|
||||
return &metav1.PartialObjectMetadata{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: apiVersion,
|
||||
Kind: kind,
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
},
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func createFakeNRTs(names ...string) []runtime.Object {
|
||||
nrts := make([]runtime.Object, len(names))
|
||||
for i, n := range names {
|
||||
nrts[i] = &v1alpha2.NodeResourceTopology{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: n,
|
||||
}}
|
||||
}
|
||||
return nrts
|
||||
}
|
||||
|
||||
type mockGC struct {
|
||||
nfdGarbageCollector
|
||||
|
||||
k8sClient k8sclientset.Interface
|
||||
client metadataclient.Interface
|
||||
}
|
||||
|
||||
func waitForNRT(cli topologyclientset.Interface, names ...string) bool {
|
||||
func waitForNRT(cli metadataclient.Interface, names ...string) bool {
|
||||
nameSet := sets.NewString(names...)
|
||||
gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
|
||||
for i := 0; i < 2; i++ {
|
||||
nrts, err := cli.TopologyV1alpha2().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{})
|
||||
rsp, err := cli.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
nrtNames := sets.NewString()
|
||||
for _, nrt := range nrts.Items {
|
||||
nrtNames.Insert(nrt.Name)
|
||||
for _, meta := range rsp.Items {
|
||||
nrtNames.Insert(meta.Name)
|
||||
}
|
||||
|
||||
if nrtNames.Equal(nameSet) {
|
||||
|
|
Loading…
Reference in a new issue