mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2025-03-14 20:56:42 +00:00
nfd-master: use separate k8s api clients for each updater
Sharing the same client between updater threads virtually serializes access, in practice making the effective parallelism close to 1. With this patch, in my bench cluster of 300 nodes, the time taken by updating all nodes drops from ~2 minutes to ~12 seconds (with the default parallelism of 10 node updater threads). This demonstrates the 10-fold increased parallelism from ~1 to 10. There might be other solutions that could be explored, e.g. caching nodes with an indexer/lister but otoh nfd doesn't necessarily need/want to watch every little change in each node. We only need to get the node when something in our own CRDs change (we don't react to any changes in the node object itself). Using multiple clients was the most obvious choice to solve the problem for now.
This commit is contained in:
parent
31a56acdd4
commit
8ad6210d5c
3 changed files with 45 additions and 33 deletions
|
@ -167,7 +167,7 @@ func TestUpdateNodeObject(t *testing.T) {
|
|||
fakeMaster := newFakeMaster(WithKubernetesClient(fakeCli))
|
||||
|
||||
Convey("When I successfully update the node with feature labels", func() {
|
||||
err := fakeMaster.updateNodeObject(testNode, featureLabels, featureAnnotations, featureExtResources, nil)
|
||||
err := fakeMaster.updateNodeObject(fakeCli, testNode, featureLabels, featureAnnotations, featureExtResources, nil)
|
||||
Convey("Error is nil", func() {
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
@ -199,7 +199,7 @@ func TestUpdateNodeObject(t *testing.T) {
|
|||
fakeCli.CoreV1().(*fakecorev1client.FakeCoreV1).PrependReactor("patch", "nodes", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, &v1.Node{}, errors.New("Fake error when patching node")
|
||||
})
|
||||
err := fakeMaster.updateNodeObject(testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
|
||||
err := fakeMaster.updateNodeObject(fakeCli, testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
|
||||
|
||||
Convey("Error is produced", func() {
|
||||
So(err, ShouldBeError)
|
||||
|
|
|
@ -44,6 +44,7 @@ import (
|
|||
k8sLabels "k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
k8sclient "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -150,6 +151,7 @@ type nfdMaster struct {
|
|||
healthServer *grpc.Server
|
||||
stop chan struct{}
|
||||
ready chan struct{}
|
||||
kubeconfig *restclient.Config
|
||||
k8sClient k8sclient.Interface
|
||||
nodeUpdaterPool *nodeUpdaterPool
|
||||
deniedNs
|
||||
|
@ -200,6 +202,7 @@ func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
|
|||
if err != nil {
|
||||
return nfd, err
|
||||
}
|
||||
nfd.kubeconfig = kubeconfig
|
||||
cli, err := k8sclient.NewForConfig(kubeconfig)
|
||||
if err != nil {
|
||||
return nfd, err
|
||||
|
@ -528,7 +531,7 @@ func (m *nfdMaster) prune() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
nodes, err := m.getNodes()
|
||||
nodes, err := getNodes(m.k8sClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -537,14 +540,14 @@ func (m *nfdMaster) prune() error {
|
|||
klog.InfoS("pruning node...", "nodeName", node.Name)
|
||||
|
||||
// Prune labels and extended resources
|
||||
err := m.updateNodeObject(&node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
|
||||
err := m.updateNodeObject(m.k8sClient, &node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
|
||||
if err != nil {
|
||||
nodeUpdateFailures.Inc()
|
||||
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
|
||||
}
|
||||
|
||||
// Prune annotations
|
||||
node, err := m.getNode(node.Name)
|
||||
node, err := getNode(m.k8sClient, node.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -564,7 +567,7 @@ func (m *nfdMaster) prune() error {
|
|||
// "nfd.node.kubernetes.io/master.version" annotation, if it exists.
|
||||
// TODO: Drop when nfdv1alpha1.MasterVersionAnnotation is removed.
|
||||
func (m *nfdMaster) updateMasterNode() error {
|
||||
node, err := m.getNode(m.nodeName)
|
||||
node, err := getNode(m.k8sClient, m.nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -575,7 +578,7 @@ func (m *nfdMaster) updateMasterNode() error {
|
|||
nil,
|
||||
"/metadata/annotations")
|
||||
|
||||
err = m.patchNode(node.Name, p)
|
||||
err = patchNode(m.k8sClient, node.Name, p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to patch node annotations: %w", err)
|
||||
}
|
||||
|
@ -727,12 +730,12 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
|
|||
}
|
||||
if !m.config.NoPublish {
|
||||
// Fetch the node object.
|
||||
node, err := m.getNode(r.NodeName)
|
||||
node, err := getNode(m.k8sClient, r.NodeName)
|
||||
if err != nil {
|
||||
return &pb.SetLabelsReply{}, err
|
||||
}
|
||||
// Create labels et al
|
||||
if err := m.refreshNodeFeatures(node, r.GetLabels(), r.GetFeatures()); err != nil {
|
||||
if err := m.refreshNodeFeatures(m.k8sClient, node, r.GetLabels(), r.GetFeatures()); err != nil {
|
||||
nodeUpdateFailures.Inc()
|
||||
return &pb.SetLabelsReply{}, err
|
||||
}
|
||||
|
@ -743,7 +746,7 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
|
|||
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
|
||||
klog.InfoS("will process all nodes in the cluster")
|
||||
|
||||
nodes, err := m.getNodes()
|
||||
nodes, err := getNodes(m.k8sClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -755,7 +758,7 @@ func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
|
||||
func (m *nfdMaster) nfdAPIUpdateOneNode(cli k8sclient.Interface, node *corev1.Node) error {
|
||||
if m.nfdController == nil || m.nfdController.featureLister == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -810,7 +813,7 @@ func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
|
|||
// Update node labels et al. This may also mean removing all NFD-owned
|
||||
// labels (et al.), for example in the case no NodeFeature objects are
|
||||
// present.
|
||||
if err := m.refreshNodeFeatures(node, features.Labels, &features.Features); err != nil {
|
||||
if err := m.refreshNodeFeatures(cli, node, features.Labels, &features.Features); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -855,7 +858,7 @@ func filterExtendedResource(name, value string, features *nfdv1alpha1.Features)
|
|||
return filteredValue, nil
|
||||
}
|
||||
|
||||
func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
|
||||
func (m *nfdMaster) refreshNodeFeatures(cli k8sclient.Interface, node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
|
||||
if m.config.AutoDefaultNs {
|
||||
labels = addNsToMapKeys(labels, nfdv1alpha1.FeatureLabelNs)
|
||||
} else if labels == nil {
|
||||
|
@ -889,7 +892,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
|
|||
return nil
|
||||
}
|
||||
|
||||
err := m.updateNodeObject(node, labels, annotations, extendedResources, taints)
|
||||
err := m.updateNodeObject(cli, node, labels, annotations, extendedResources, taints)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to update node", "nodeName", node.Name)
|
||||
return err
|
||||
|
@ -901,7 +904,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
|
|||
// setTaints sets node taints and annotations based on the taints passed via
|
||||
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
|
||||
// NFD owned taints and annotations are removed from the node.
|
||||
func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
|
||||
func setTaints(cli k8sclient.Interface, taints []corev1.Taint, node *corev1.Node) error {
|
||||
// De-serialize the taints annotation into corev1.Taint type for comparision below.
|
||||
var err error
|
||||
oldTaints := []corev1.Taint{}
|
||||
|
@ -940,7 +943,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
|
|||
}
|
||||
|
||||
if taintsUpdated {
|
||||
if err := controller.PatchNodeTaints(context.TODO(), m.k8sClient, node.Name, node, newNode); err != nil {
|
||||
if err := controller.PatchNodeTaints(context.TODO(), cli, node.Name, node, newNode); err != nil {
|
||||
return fmt.Errorf("failed to patch the node %v", node.Name)
|
||||
}
|
||||
klog.InfoS("updated node taints", "nodeName", node.Name)
|
||||
|
@ -960,7 +963,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
|
|||
|
||||
patches := createPatches([]string{nfdv1alpha1.NodeTaintsAnnotation}, node.Annotations, newAnnotations, "/metadata/annotations")
|
||||
if len(patches) > 0 {
|
||||
if err := m.patchNode(node.Name, patches); err != nil {
|
||||
if err := patchNode(cli, node.Name, patches); err != nil {
|
||||
return fmt.Errorf("error while patching node object: %w", err)
|
||||
}
|
||||
klog.V(1).InfoS("patched node annotations for taints", "nodeName", node.Name)
|
||||
|
@ -1057,7 +1060,7 @@ func (m *nfdMaster) processNodeFeatureRule(nodeName string, features *nfdv1alpha
|
|||
// updateNodeObject ensures the Kubernetes node object is up to date,
|
||||
// creating new labels and extended resources where necessary and removing
|
||||
// outdated ones. Also updates the corresponding annotations.
|
||||
func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
|
||||
func (m *nfdMaster) updateNodeObject(cli k8sclient.Interface, node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
|
||||
annotations := make(Annotations)
|
||||
|
||||
// Store names of labels in an annotation
|
||||
|
@ -1110,13 +1113,13 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
|
|||
|
||||
// patch node status with extended resource changes
|
||||
statusPatches := m.createExtendedResourcePatches(node, extendedResources)
|
||||
err := m.patchNodeStatus(node.Name, statusPatches)
|
||||
err := patchNodeStatus(cli, node.Name, statusPatches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while patching extended resources: %w", err)
|
||||
}
|
||||
|
||||
// Patch the node object in the apiserver
|
||||
err = m.patchNode(node.Name, patches)
|
||||
err = patchNode(cli, node.Name, patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while patching node object: %w", err)
|
||||
}
|
||||
|
@ -1129,7 +1132,7 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
|
|||
}
|
||||
|
||||
// Set taints
|
||||
err = m.setTaints(taints, node)
|
||||
err = setTaints(cli, taints, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1420,25 +1423,25 @@ func (m *nfdMaster) filterFeatureAnnotations(annotations map[string]string) map[
|
|||
return outAnnotations
|
||||
}
|
||||
|
||||
func (m *nfdMaster) getNode(nodeName string) (*corev1.Node, error) {
|
||||
return m.k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||
func getNode(cli k8sclient.Interface, nodeName string) (*corev1.Node, error) {
|
||||
return cli.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (m *nfdMaster) getNodes() (*corev1.NodeList, error) {
|
||||
return m.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
func getNodes(cli k8sclient.Interface) (*corev1.NodeList, error) {
|
||||
return cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
}
|
||||
|
||||
func (m *nfdMaster) patchNode(nodeName string, patches []utils.JsonPatch, subresources ...string) error {
|
||||
func patchNode(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch, subresources ...string) error {
|
||||
if len(patches) == 0 {
|
||||
return nil
|
||||
}
|
||||
data, err := json.Marshal(patches)
|
||||
if err == nil {
|
||||
_, err = m.k8sClient.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
|
||||
_, err = cli.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *nfdMaster) patchNodeStatus(nodeName string, patches []utils.JsonPatch) error {
|
||||
return m.patchNode(nodeName, patches, "status")
|
||||
func patchNodeStatus(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch) error {
|
||||
return patchNode(cli, nodeName, patches, "status")
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"golang.org/x/time/rate"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
k8sclient "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
@ -41,7 +42,7 @@ func newNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {
|
|||
}
|
||||
}
|
||||
|
||||
func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingInterface) bool {
|
||||
func (u *nodeUpdaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
|
||||
n, quit := queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
|
@ -53,9 +54,9 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
|
|||
nodeUpdateRequests.Inc()
|
||||
|
||||
// Check if node exists
|
||||
if node, err := u.nfdMaster.getNode(nodeName); apierrors.IsNotFound(err) {
|
||||
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
|
||||
klog.InfoS("node not found, skip update", "nodeName", nodeName)
|
||||
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(node); err != nil {
|
||||
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
|
||||
if n := queue.NumRequeues(nodeName); n < 15 {
|
||||
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
|
||||
} else {
|
||||
|
@ -71,7 +72,15 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
|
|||
}
|
||||
|
||||
func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
|
||||
for u.processNodeUpdateRequest(queue) {
|
||||
var cli k8sclient.Interface
|
||||
if u.nfdMaster.kubeconfig != nil {
|
||||
// For normal execution, initialize a separate api client for each updater
|
||||
cli = k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)
|
||||
} else {
|
||||
// For tests, re-use the api client from nfd-master
|
||||
cli = u.nfdMaster.k8sClient
|
||||
}
|
||||
for u.processNodeUpdateRequest(cli, queue) {
|
||||
}
|
||||
u.wg.Done()
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue