1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-31 04:04:51 +00:00

Merge pull request #1653 from marquiz/devel/master-multiple-k8sclients

nfd-master: use separate k8s api clients for each updater
This commit is contained in:
Kubernetes Prow Robot 2024-04-15 09:18:51 -07:00 committed by GitHub
commit 91d3d5a7b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 45 additions and 33 deletions

View file

@ -167,7 +167,7 @@ func TestUpdateNodeObject(t *testing.T) {
fakeMaster := newFakeMaster(WithKubernetesClient(fakeCli))
Convey("When I successfully update the node with feature labels", func() {
err := fakeMaster.updateNodeObject(testNode, featureLabels, featureAnnotations, featureExtResources, nil)
err := fakeMaster.updateNodeObject(fakeCli, testNode, featureLabels, featureAnnotations, featureExtResources, nil)
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
@ -199,7 +199,7 @@ func TestUpdateNodeObject(t *testing.T) {
fakeCli.CoreV1().(*fakecorev1client.FakeCoreV1).PrependReactor("patch", "nodes", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.Node{}, errors.New("Fake error when patching node")
})
err := fakeMaster.updateNodeObject(testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
err := fakeMaster.updateNodeObject(fakeCli, testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
Convey("Error is produced", func() {
So(err, ShouldBeError)

View file

@ -44,6 +44,7 @@ import (
k8sLabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
k8sclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
@ -150,6 +151,7 @@ type nfdMaster struct {
healthServer *grpc.Server
stop chan struct{}
ready chan struct{}
kubeconfig *restclient.Config
k8sClient k8sclient.Interface
nodeUpdaterPool *nodeUpdaterPool
deniedNs
@ -200,6 +202,7 @@ func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
if err != nil {
return nfd, err
}
nfd.kubeconfig = kubeconfig
cli, err := k8sclient.NewForConfig(kubeconfig)
if err != nil {
return nfd, err
@ -528,7 +531,7 @@ func (m *nfdMaster) prune() error {
return nil
}
nodes, err := m.getNodes()
nodes, err := getNodes(m.k8sClient)
if err != nil {
return err
}
@ -537,14 +540,14 @@ func (m *nfdMaster) prune() error {
klog.InfoS("pruning node...", "nodeName", node.Name)
// Prune labels and extended resources
err := m.updateNodeObject(&node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
err := m.updateNodeObject(m.k8sClient, &node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
if err != nil {
nodeUpdateFailures.Inc()
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
}
// Prune annotations
node, err := m.getNode(node.Name)
node, err := getNode(m.k8sClient, node.Name)
if err != nil {
return err
}
@ -564,7 +567,7 @@ func (m *nfdMaster) prune() error {
// "nfd.node.kubernetes.io/master.version" annotation, if it exists.
// TODO: Drop when nfdv1alpha1.MasterVersionAnnotation is removed.
func (m *nfdMaster) updateMasterNode() error {
node, err := m.getNode(m.nodeName)
node, err := getNode(m.k8sClient, m.nodeName)
if err != nil {
return err
}
@ -575,7 +578,7 @@ func (m *nfdMaster) updateMasterNode() error {
nil,
"/metadata/annotations")
err = m.patchNode(node.Name, p)
err = patchNode(m.k8sClient, node.Name, p)
if err != nil {
return fmt.Errorf("failed to patch node annotations: %w", err)
}
@ -727,12 +730,12 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
}
if !m.config.NoPublish {
// Fetch the node object.
node, err := m.getNode(r.NodeName)
node, err := getNode(m.k8sClient, r.NodeName)
if err != nil {
return &pb.SetLabelsReply{}, err
}
// Create labels et al
if err := m.refreshNodeFeatures(node, r.GetLabels(), r.GetFeatures()); err != nil {
if err := m.refreshNodeFeatures(m.k8sClient, node, r.GetLabels(), r.GetFeatures()); err != nil {
nodeUpdateFailures.Inc()
return &pb.SetLabelsReply{}, err
}
@ -743,7 +746,7 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
klog.InfoS("will process all nodes in the cluster")
nodes, err := m.getNodes()
nodes, err := getNodes(m.k8sClient)
if err != nil {
return err
}
@ -755,7 +758,7 @@ func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
return nil
}
func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
func (m *nfdMaster) nfdAPIUpdateOneNode(cli k8sclient.Interface, node *corev1.Node) error {
if m.nfdController == nil || m.nfdController.featureLister == nil {
return nil
}
@ -810,7 +813,7 @@ func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
// Update node labels et al. This may also mean removing all NFD-owned
// labels (et al.), for example in the case no NodeFeature objects are
// present.
if err := m.refreshNodeFeatures(node, features.Labels, &features.Features); err != nil {
if err := m.refreshNodeFeatures(cli, node, features.Labels, &features.Features); err != nil {
return err
}
@ -855,7 +858,7 @@ func filterExtendedResource(name, value string, features *nfdv1alpha1.Features)
return filteredValue, nil
}
func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
func (m *nfdMaster) refreshNodeFeatures(cli k8sclient.Interface, node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
if m.config.AutoDefaultNs {
labels = addNsToMapKeys(labels, nfdv1alpha1.FeatureLabelNs)
} else if labels == nil {
@ -889,7 +892,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
return nil
}
err := m.updateNodeObject(node, labels, annotations, extendedResources, taints)
err := m.updateNodeObject(cli, node, labels, annotations, extendedResources, taints)
if err != nil {
klog.ErrorS(err, "failed to update node", "nodeName", node.Name)
return err
@ -901,7 +904,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
// setTaints sets node taints and annotations based on the taints passed via
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
// NFD owned taints and annotations are removed from the node.
func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
func setTaints(cli k8sclient.Interface, taints []corev1.Taint, node *corev1.Node) error {
// De-serialize the taints annotation into corev1.Taint type for comparision below.
var err error
oldTaints := []corev1.Taint{}
@ -940,7 +943,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
}
if taintsUpdated {
if err := controller.PatchNodeTaints(context.TODO(), m.k8sClient, node.Name, node, newNode); err != nil {
if err := controller.PatchNodeTaints(context.TODO(), cli, node.Name, node, newNode); err != nil {
return fmt.Errorf("failed to patch the node %v", node.Name)
}
klog.InfoS("updated node taints", "nodeName", node.Name)
@ -960,7 +963,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
patches := createPatches([]string{nfdv1alpha1.NodeTaintsAnnotation}, node.Annotations, newAnnotations, "/metadata/annotations")
if len(patches) > 0 {
if err := m.patchNode(node.Name, patches); err != nil {
if err := patchNode(cli, node.Name, patches); err != nil {
return fmt.Errorf("error while patching node object: %w", err)
}
klog.V(1).InfoS("patched node annotations for taints", "nodeName", node.Name)
@ -1057,7 +1060,7 @@ func (m *nfdMaster) processNodeFeatureRule(nodeName string, features *nfdv1alpha
// updateNodeObject ensures the Kubernetes node object is up to date,
// creating new labels and extended resources where necessary and removing
// outdated ones. Also updates the corresponding annotations.
func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
func (m *nfdMaster) updateNodeObject(cli k8sclient.Interface, node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
annotations := make(Annotations)
// Store names of labels in an annotation
@ -1110,13 +1113,13 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
// patch node status with extended resource changes
statusPatches := m.createExtendedResourcePatches(node, extendedResources)
err := m.patchNodeStatus(node.Name, statusPatches)
err := patchNodeStatus(cli, node.Name, statusPatches)
if err != nil {
return fmt.Errorf("error while patching extended resources: %w", err)
}
// Patch the node object in the apiserver
err = m.patchNode(node.Name, patches)
err = patchNode(cli, node.Name, patches)
if err != nil {
return fmt.Errorf("error while patching node object: %w", err)
}
@ -1129,7 +1132,7 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
}
// Set taints
err = m.setTaints(taints, node)
err = setTaints(cli, taints, node)
if err != nil {
return err
}
@ -1420,25 +1423,25 @@ func (m *nfdMaster) filterFeatureAnnotations(annotations map[string]string) map[
return outAnnotations
}
func (m *nfdMaster) getNode(nodeName string) (*corev1.Node, error) {
return m.k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
func getNode(cli k8sclient.Interface, nodeName string) (*corev1.Node, error) {
return cli.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}
func (m *nfdMaster) getNodes() (*corev1.NodeList, error) {
return m.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
func getNodes(cli k8sclient.Interface) (*corev1.NodeList, error) {
return cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
}
func (m *nfdMaster) patchNode(nodeName string, patches []utils.JsonPatch, subresources ...string) error {
func patchNode(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch, subresources ...string) error {
if len(patches) == 0 {
return nil
}
data, err := json.Marshal(patches)
if err == nil {
_, err = m.k8sClient.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
_, err = cli.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
}
return err
}
func (m *nfdMaster) patchNodeStatus(nodeName string, patches []utils.JsonPatch) error {
return m.patchNode(nodeName, patches, "status")
func patchNodeStatus(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch) error {
return patchNode(cli, nodeName, patches, "status")
}

View file

@ -22,6 +22,7 @@ import (
"golang.org/x/time/rate"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
@ -41,7 +42,7 @@ func newNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {
}
}
func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingInterface) bool {
func (u *nodeUpdaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
n, quit := queue.Get()
if quit {
return false
@ -53,9 +54,9 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
nodeUpdateRequests.Inc()
// Check if node exists
if node, err := u.nfdMaster.getNode(nodeName); apierrors.IsNotFound(err) {
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
klog.InfoS("node not found, skip update", "nodeName", nodeName)
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(node); err != nil {
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
if n := queue.NumRequeues(nodeName); n < 15 {
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
} else {
@ -71,7 +72,15 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
}
func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
for u.processNodeUpdateRequest(queue) {
var cli k8sclient.Interface
if u.nfdMaster.kubeconfig != nil {
// For normal execution, initialize a separate api client for each updater
cli = k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)
} else {
// For tests, re-use the api client from nfd-master
cli = u.nfdMaster.k8sClient
}
for u.processNodeUpdateRequest(cli, queue) {
}
u.wg.Done()
}