mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2024-12-14 11:57:51 +00:00
nfd-master: support NodeFeature objects
Add initial support for handling NodeFeature objects. With this patch nfd-master watches NodeFeature objects in all namespaces and reacts to changes in any of these. The node which a certain NodeFeature object affects is determined by the "nfd.node.kubernetes.io/node-name" annotation of the object. When a NodeFeature object targeting certain node is changed, nfd-master needs to process all other objects targeting the same node, too, because there may be dependencies between them. Add a new command line flag for selecting between gRPC and NodeFeature CRD API as the source of feature requests. Enabling NodeFeature API disables the gRPC interface. -enable-nodefeature-api enable NodeFeature CRD API for incoming feature requests, will disable the gRPC interface (defaults to false) It is not possible to serve gRPC and watch NodeFeature objects at the same time. This is deliberate to avoid labeling races e.g. by nfd-worker sending gRPC requests but NodeFeature objects in the cluster "overriding" those changes (labels from the gRPC requests will get overridden when NodeFeature objects are processed).
This commit is contained in:
parent
237494463b
commit
6ddd87e465
9 changed files with 272 additions and 68 deletions
|
@ -94,6 +94,8 @@ func initFlags(flagset *flag.FlagSet) *master.Args {
|
|||
flagset.Var(&args.LabelWhiteList, "label-whitelist",
|
||||
"Regular expression to filter label names to publish to the Kubernetes API server. "+
|
||||
"NB: the label namespace is omitted i.e. the filter is only applied to the name part after '/'.")
|
||||
flagset.BoolVar(&args.EnableNodeFeatureApi, "-enable-nodefeature-api", false,
|
||||
"Enable the NodeFeature CRD API for receiving node features. This will automatically disable the gRPC communication.")
|
||||
flagset.BoolVar(&args.NoPublish, "no-publish", false,
|
||||
"Do not publish feature labels")
|
||||
flagset.BoolVar(&args.EnableTaints, "enable-taints", false,
|
||||
|
|
|
@ -15,6 +15,7 @@ rules:
|
|||
- apiGroups:
|
||||
- nfd.k8s-sigs.io
|
||||
resources:
|
||||
- nodefeatures
|
||||
- nodefeaturerules
|
||||
verbs:
|
||||
- get
|
||||
|
|
|
@ -27,6 +27,7 @@ rules:
|
|||
- apiGroups:
|
||||
- nfd.k8s-sigs.io
|
||||
resources:
|
||||
- nodefeatures
|
||||
- nodefeaturerules
|
||||
verbs:
|
||||
- get
|
||||
|
|
|
@ -78,6 +78,9 @@ spec:
|
|||
{{- if .Values.master.instance | empty | not }}
|
||||
- "--instance={{ .Values.master.instance }}"
|
||||
{{- end }}
|
||||
{{- if .Values.enableNodeFeatureApi }}
|
||||
- "-enable-nodefeature-api"
|
||||
{{- end }}
|
||||
{{- if .Values.master.extraLabelNs | empty | not }}
|
||||
- "--extra-label-ns={{- join "," .Values.master.extraLabelNs }}"
|
||||
{{- end }}
|
||||
|
|
|
@ -14,6 +14,7 @@ enableNodeFeatureApi: false
|
|||
|
||||
master:
|
||||
instance:
|
||||
featureApi:
|
||||
extraLabelNs: []
|
||||
resourceLabels: []
|
||||
featureRulesController: null
|
||||
|
|
|
@ -149,6 +149,20 @@ nfd-master -verify-node-name -ca-file=/opt/nfd/ca.crt \
|
|||
-cert-file=/opt/nfd/master.crt -key-file=/opt/nfd/master.key
|
||||
```
|
||||
|
||||
### -enable-nodefeature-api
|
||||
|
||||
The `-enable-nodefeature-api` flag enables the NodeFeature CRD API for
|
||||
receiving feature requests. This will also automatically disable the gRPC
|
||||
interface.
|
||||
|
||||
Default: false
|
||||
|
||||
Example:
|
||||
|
||||
```bash
|
||||
nfd-master -enable-nodefeature-api
|
||||
```
|
||||
|
||||
### -no-publish
|
||||
|
||||
The `-no-publish` flag disables updates to the Node objects in the Kubernetes
|
||||
|
|
|
@ -32,42 +32,88 @@ import (
|
|||
)
|
||||
|
||||
type nfdController struct {
|
||||
ruleLister nfdlisters.NodeFeatureRuleLister
|
||||
featureLister nfdlisters.NodeFeatureLister
|
||||
ruleLister nfdlisters.NodeFeatureRuleLister
|
||||
|
||||
stopChan chan struct{}
|
||||
|
||||
updateAllNodesChan chan struct{}
|
||||
updateOneNodeChan chan string
|
||||
}
|
||||
|
||||
func newNfdController(config *restclient.Config) (*nfdController, error) {
|
||||
func newNfdController(config *restclient.Config, disableNodeFeature bool) (*nfdController, error) {
|
||||
c := &nfdController{
|
||||
stopChan: make(chan struct{}, 1),
|
||||
stopChan: make(chan struct{}, 1),
|
||||
updateAllNodesChan: make(chan struct{}, 1),
|
||||
updateOneNodeChan: make(chan string),
|
||||
}
|
||||
|
||||
nfdClient := nfdclientset.NewForConfigOrDie(config)
|
||||
|
||||
informerFactory := nfdinformers.NewSharedInformerFactory(nfdClient, 5*time.Minute)
|
||||
|
||||
// Add informer for NodeFeature objects
|
||||
if !disableNodeFeature {
|
||||
featureInformer := informerFactory.Nfd().V1alpha1().NodeFeatures()
|
||||
if _, err := featureInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(obj)
|
||||
klog.V(2).Infof("NodeFeature %v added", key)
|
||||
c.updateOneNode(obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(newObj)
|
||||
klog.V(2).Infof("NodeFeature %v updated", key)
|
||||
c.updateOneNode(newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(obj)
|
||||
klog.V(2).Infof("NodeFeature %v deleted", key)
|
||||
c.updateOneNode(obj)
|
||||
},
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.featureLister = featureInformer.Lister()
|
||||
}
|
||||
|
||||
// Add informer for NodeFeatureRule objects
|
||||
ruleInformer := informerFactory.Nfd().V1alpha1().NodeFeatureRules()
|
||||
if _, err := ruleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(object interface{}) {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(object)
|
||||
klog.V(2).Infof("NodeFeatureRule %v added", key)
|
||||
if !disableNodeFeature {
|
||||
c.updateAllNodes()
|
||||
}
|
||||
// else: rules will be processed only when gRPC requests are received
|
||||
},
|
||||
UpdateFunc: func(oldObject, newObject interface{}) {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(newObject)
|
||||
klog.V(2).Infof("NodeFeatureRule %v updated", key)
|
||||
if !disableNodeFeature {
|
||||
c.updateAllNodes()
|
||||
}
|
||||
// else: rules will be processed only when gRPC requests are received
|
||||
},
|
||||
DeleteFunc: func(object interface{}) {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(object)
|
||||
klog.V(2).Infof("NodeFeatureRule %v deleted", key)
|
||||
if !disableNodeFeature {
|
||||
c.updateAllNodes()
|
||||
}
|
||||
// else: rules will be processed only when gRPC requests are received
|
||||
},
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.ruleLister = ruleInformer.Lister()
|
||||
|
||||
// Start informers
|
||||
informerFactory.Start(c.stopChan)
|
||||
|
||||
utilruntime.Must(nfdv1alpha1.AddToScheme(nfdscheme.Scheme))
|
||||
|
||||
c.ruleLister = ruleInformer.Lister()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
|
@ -77,3 +123,32 @@ func (c *nfdController) stop() {
|
|||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *nfdController) updateOneNode(obj interface{}) {
|
||||
o, ok := obj.(*nfdv1alpha1.NodeFeature)
|
||||
if !ok {
|
||||
klog.Errorf("not a NodeFeature object (but of type %T): %v", obj, obj)
|
||||
return
|
||||
}
|
||||
|
||||
nodeName, ok := o.Labels[nfdv1alpha1.NodeFeatureObjNodeNameLabel]
|
||||
if !ok {
|
||||
klog.Errorf("no node name for NodeFeature object %s/%s: %q label is missing",
|
||||
o.Namespace, o.Name, nfdv1alpha1.NodeFeatureObjNodeNameLabel)
|
||||
return
|
||||
}
|
||||
if nodeName == "" {
|
||||
klog.Errorf("no node name for NodeFeature object %s/%s: %q label is empty",
|
||||
o.Namespace, o.Name, nfdv1alpha1.NodeFeatureObjNodeNameLabel)
|
||||
return
|
||||
}
|
||||
|
||||
c.updateOneNodeChan <- nodeName
|
||||
}
|
||||
|
||||
func (c *nfdController) updateAllNodes() {
|
||||
select {
|
||||
case c.updateAllNodesChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,7 +61,7 @@ func newMockMaster(apihelper apihelper.APIHelpers) *nfdMaster {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpdateNodeFeatures(t *testing.T) {
|
||||
func TestUpdateNodeObject(t *testing.T) {
|
||||
Convey("When I update the node using fake client", t, func() {
|
||||
fakeFeatureLabels := map[string]string{
|
||||
nfdv1alpha1.FeatureLabelNs + "/source-feature.1": "1",
|
||||
|
@ -112,10 +112,10 @@ func TestUpdateNodeFeatures(t *testing.T) {
|
|||
}
|
||||
|
||||
mockAPIHelper.On("GetClient").Return(mockClient, nil)
|
||||
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(mockNode, nil).Once()
|
||||
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(mockNode, nil).Twice()
|
||||
mockAPIHelper.On("PatchNode", mockClient, mockNodeName, mock.MatchedBy(jsonPatchMatcher(metadataPatches))).Return(nil)
|
||||
mockAPIHelper.On("PatchNodeStatus", mockClient, mockNodeName, mock.MatchedBy(jsonPatchMatcher(statusPatches))).Return(nil)
|
||||
err := mockMaster.updateNodeFeatures(mockClient, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
|
||||
err := mockMaster.updateNodeObject(mockClient, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources, nil)
|
||||
|
||||
Convey("Error is nil", func() {
|
||||
So(err, ShouldBeNil)
|
||||
|
@ -125,7 +125,7 @@ func TestUpdateNodeFeatures(t *testing.T) {
|
|||
Convey("When I fail to update the node with feature labels", func() {
|
||||
expectedError := fmt.Errorf("no client is passed, client: <nil>")
|
||||
mockAPIHelper.On("GetClient").Return(nil, expectedError)
|
||||
err := mockMaster.updateNodeFeatures(nil, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
|
||||
err := mockMaster.updateNodeObject(nil, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources, nil)
|
||||
|
||||
Convey("Error is produced", func() {
|
||||
So(err, ShouldResemble, expectedError)
|
||||
|
@ -135,7 +135,7 @@ func TestUpdateNodeFeatures(t *testing.T) {
|
|||
Convey("When I fail to get a mock client while updating feature labels", func() {
|
||||
expectedError := fmt.Errorf("no client is passed, client: <nil>")
|
||||
mockAPIHelper.On("GetClient").Return(nil, expectedError)
|
||||
err := mockMaster.updateNodeFeatures(nil, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
|
||||
err := mockMaster.updateNodeObject(nil, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources, nil)
|
||||
|
||||
Convey("Error is produced", func() {
|
||||
So(err, ShouldResemble, expectedError)
|
||||
|
@ -145,8 +145,8 @@ func TestUpdateNodeFeatures(t *testing.T) {
|
|||
Convey("When I fail to get a mock node while updating feature labels", func() {
|
||||
expectedError := errors.New("fake error")
|
||||
mockAPIHelper.On("GetClient").Return(mockClient, nil)
|
||||
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(nil, expectedError).Once()
|
||||
err := mockMaster.updateNodeFeatures(mockClient, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
|
||||
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(nil, expectedError).Twice()
|
||||
err := mockMaster.updateNodeObject(mockClient, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources, nil)
|
||||
|
||||
Convey("Error is produced", func() {
|
||||
So(err, ShouldEqual, expectedError)
|
||||
|
@ -156,9 +156,9 @@ func TestUpdateNodeFeatures(t *testing.T) {
|
|||
Convey("When I fail to update a mock node while updating feature labels", func() {
|
||||
expectedError := errors.New("fake error")
|
||||
mockAPIHelper.On("GetClient").Return(mockClient, nil)
|
||||
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(mockNode, nil).Once()
|
||||
mockAPIHelper.On("PatchNode", mockClient, mockNodeName, mock.Anything).Return(expectedError).Once()
|
||||
err := mockMaster.updateNodeFeatures(mockClient, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
|
||||
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(mockNode, nil).Twice()
|
||||
mockAPIHelper.On("PatchNode", mockClient, mockNodeName, mock.Anything).Return(expectedError).Twice()
|
||||
err := mockMaster.updateNodeObject(mockClient, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources, nil)
|
||||
|
||||
Convey("Error is produced", func() {
|
||||
So(err.Error(), ShouldEndWith, expectedError.Error())
|
||||
|
@ -294,7 +294,7 @@ func TestRemovingExtResources(t *testing.T) {
|
|||
|
||||
func TestSetLabels(t *testing.T) {
|
||||
Convey("When servicing SetLabels request", t, func() {
|
||||
const workerName = "mock-worker"
|
||||
const workerName = mockNodeName
|
||||
const workerVer = "0.1-test"
|
||||
mockHelper := &apihelper.MockAPIHelpers{}
|
||||
mockMaster := newMockMaster(mockHelper)
|
||||
|
@ -324,7 +324,7 @@ func TestSetLabels(t *testing.T) {
|
|||
}
|
||||
|
||||
mockHelper.On("GetClient").Return(mockClient, nil)
|
||||
mockHelper.On("GetNode", mockClient, workerName).Return(mockNode, nil)
|
||||
mockHelper.On("GetNode", mockClient, workerName).Return(mockNode, nil).Twice()
|
||||
mockHelper.On("PatchNode", mockClient, mockNodeName, mock.MatchedBy(jsonPatchMatcher(expectedPatches))).Return(nil)
|
||||
mockHelper.On("PatchNodeStatus", mockClient, mockNodeName, mock.MatchedBy(jsonPatchMatcher(expectedStatusPatches))).Return(nil)
|
||||
_, err := mockMaster.SetLabels(mockCtx, mockReq)
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/peer"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
label "k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
|
@ -69,6 +70,7 @@ type Args struct {
|
|||
Kubeconfig string
|
||||
LabelWhiteList utils.RegexpVal
|
||||
FeatureRulesController bool
|
||||
EnableNodeFeatureApi bool
|
||||
NoPublish bool
|
||||
EnableTaints bool
|
||||
Port int
|
||||
|
@ -87,6 +89,7 @@ type nfdMaster struct {
|
|||
*nfdController
|
||||
|
||||
args Args
|
||||
namespace string
|
||||
nodeName string
|
||||
server *grpc.Server
|
||||
stop chan struct{}
|
||||
|
@ -98,9 +101,10 @@ type nfdMaster struct {
|
|||
// NewNfdMaster creates a new NfdMaster server instance.
|
||||
func NewNfdMaster(args *Args) (NfdMaster, error) {
|
||||
nfd := &nfdMaster{args: *args,
|
||||
nodeName: os.Getenv("NODE_NAME"),
|
||||
ready: make(chan bool, 1),
|
||||
stop: make(chan struct{}, 1),
|
||||
nodeName: os.Getenv("NODE_NAME"),
|
||||
namespace: utils.GetKubernetesNamespace(),
|
||||
ready: make(chan bool, 1),
|
||||
stop: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
if args.Instance != "" {
|
||||
|
@ -144,6 +148,7 @@ func (m *nfdMaster) Run() error {
|
|||
klog.Infof("Master instance: %q", m.args.Instance)
|
||||
}
|
||||
klog.Infof("NodeName: %q", m.nodeName)
|
||||
klog.Infof("Kubernetes namespace: %q", m.namespace)
|
||||
|
||||
if m.args.Prune {
|
||||
return m.prune()
|
||||
|
@ -155,7 +160,7 @@ func (m *nfdMaster) Run() error {
|
|||
return err
|
||||
}
|
||||
klog.Info("starting nfd api controller")
|
||||
m.nfdController, err = newNfdController(kubeconfig)
|
||||
m.nfdController, err = newNfdController(kubeconfig, !m.args.EnableNodeFeatureApi)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize CRD controller: %w", err)
|
||||
}
|
||||
|
@ -170,7 +175,14 @@ func (m *nfdMaster) Run() error {
|
|||
|
||||
// Run gRPC server
|
||||
grpcErr := make(chan error, 1)
|
||||
go m.runGrpcServer(grpcErr)
|
||||
if !m.args.EnableNodeFeatureApi {
|
||||
go m.runGrpcServer(grpcErr)
|
||||
}
|
||||
|
||||
// Run updater that handles events from the nfd CRD API.
|
||||
if m.nfdController != nil {
|
||||
go m.nfdAPIUpdateHandler()
|
||||
}
|
||||
|
||||
// Notify that we're ready to accept connections
|
||||
m.ready <- true
|
||||
|
@ -245,6 +257,22 @@ func (m *nfdMaster) runGrpcServer(errChan chan<- error) {
|
|||
}
|
||||
}
|
||||
|
||||
// nfdAPIUpdateHandler handles events from the nfd API controller.
|
||||
func (m *nfdMaster) nfdAPIUpdateHandler() {
|
||||
for {
|
||||
select {
|
||||
case <-m.nfdController.updateAllNodesChan:
|
||||
if err := m.nfdAPIUpdateAllNodes(); err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
case nodeName := <-m.nfdController.updateOneNodeChan:
|
||||
if err := m.nfdAPIUpdateOneNode(nodeName); err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop NfdMaster
|
||||
func (m *nfdMaster) Stop() {
|
||||
m.server.GracefulStop()
|
||||
|
@ -290,16 +318,9 @@ func (m *nfdMaster) prune() error {
|
|||
klog.Infof("pruning node %q...", node.Name)
|
||||
|
||||
// Prune labels and extended resources
|
||||
err := m.updateNodeFeatures(cli, node.Name, Labels{}, Annotations{}, ExtendedResources{})
|
||||
err := m.updateNodeObject(cli, node.Name, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prune labels from node %q: %v", node.Name, err)
|
||||
}
|
||||
|
||||
// Prune taints
|
||||
err = m.setTaints(cli, []corev1.Taint{}, node.Name)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prune taints from node %q: %v", node.Name, err)
|
||||
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
|
||||
}
|
||||
|
||||
// Prune annotations
|
||||
|
@ -421,20 +442,6 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
|
|||
klog.Infof("received labeling request for node %q", r.NodeName)
|
||||
}
|
||||
|
||||
// Mix in CR-originated labels
|
||||
rawLabels := make(map[string]string)
|
||||
if r.Labels != nil {
|
||||
// NOTE: we effectively mangle the request struct by not creating a deep copy of the map
|
||||
rawLabels = r.Labels
|
||||
}
|
||||
crLabels, crTaints := m.processNodeFeatureRule(r)
|
||||
|
||||
for k, v := range crLabels {
|
||||
rawLabels[k] = v
|
||||
}
|
||||
|
||||
labels, extendedResources := filterFeatureLabels(rawLabels, m.args.ExtraLabelNs, m.args.LabelWhiteList.Regexp, m.args.ResourceLabels)
|
||||
|
||||
if !m.args.NoPublish {
|
||||
cli, err := m.apihelper.GetClient()
|
||||
if err != nil {
|
||||
|
@ -444,29 +451,126 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
|
|||
// Advertise NFD worker version as an annotation
|
||||
annotations := Annotations{m.instanceAnnotation(nfdv1alpha1.WorkerVersionAnnotation): r.NfdVersion}
|
||||
|
||||
err = m.updateNodeFeatures(cli, r.NodeName, labels, annotations, extendedResources)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to advertise labels: %v", err)
|
||||
return &pb.SetLabelsReply{}, err
|
||||
}
|
||||
|
||||
// set taints
|
||||
var taints []corev1.Taint
|
||||
if m.args.EnableTaints {
|
||||
taints = crTaints
|
||||
}
|
||||
|
||||
// Call setTaints even though the feature flag is disabled. This
|
||||
// ensures that we delete NFD owned stale taints when flag got
|
||||
// turned off.
|
||||
err = m.setTaints(cli, taints, r.NodeName)
|
||||
if err != nil {
|
||||
// Create labels et al
|
||||
if err := m.refreshNodeFeatures(cli, r.NodeName, annotations, r.GetLabels(), r.GetFeatures()); err != nil {
|
||||
return &pb.SetLabelsReply{}, err
|
||||
}
|
||||
}
|
||||
return &pb.SetLabelsReply{}, nil
|
||||
}
|
||||
|
||||
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
|
||||
klog.Info("will process all nodes in the cluster")
|
||||
|
||||
cli, err := m.apihelper.GetClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodes, err := m.apihelper.GetNodes(cli)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, node := range nodes.Items {
|
||||
if err := m.nfdAPIUpdateOneNode(node.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *nfdMaster) nfdAPIUpdateOneNode(nodeName string) error {
|
||||
sel := labels.SelectorFromSet(labels.Set{nfdv1alpha1.NodeFeatureObjNodeNameLabel: nodeName})
|
||||
objs, err := m.nfdController.featureLister.List(sel)
|
||||
if len(objs) == 0 {
|
||||
klog.Infof("no NodeFeature object exists for node %q, skipping...", nodeName)
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to get NodeFeature resources for node %q: %w", nodeName, err)
|
||||
}
|
||||
|
||||
// Sort our objects
|
||||
sort.Slice(objs, func(i, j int) bool {
|
||||
// Objects in our nfd namespace gets into the beginning of the list
|
||||
if objs[i].Namespace == m.namespace && objs[j].Namespace != m.namespace {
|
||||
return true
|
||||
}
|
||||
if objs[i].Namespace != m.namespace && objs[j].Namespace == m.namespace {
|
||||
return false
|
||||
}
|
||||
// After the nfd namespace, sort objects by their name
|
||||
if objs[i].Name != objs[j].Name {
|
||||
return objs[i].Name < objs[j].Name
|
||||
}
|
||||
// Objects with the same name are sorted by their namespace
|
||||
return objs[i].Namespace < objs[j].Namespace
|
||||
})
|
||||
|
||||
if m.args.NoPublish {
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.V(1).Infof("processing node %q, initiated by NodeFeature API", nodeName)
|
||||
|
||||
// Merge in features
|
||||
//
|
||||
// TODO: support multiple NodeFeature objects. There are two obvious options to implement this:
|
||||
// 1. Merge features of all objects into one joint object
|
||||
// 2. Change the rule api to support handle multiple objects
|
||||
// Of these #2 would probably perform better with lot less data to copy. We
|
||||
// could probably even get rid of the DeepCopy in this scenario.
|
||||
features := objs[0].DeepCopy()
|
||||
|
||||
annotations := Annotations{}
|
||||
if objs[0].Namespace == m.namespace && objs[0].Name == nodeName {
|
||||
// This is the one created by nfd-worker
|
||||
if v := objs[0].Annotations[nfdv1alpha1.WorkerVersionAnnotation]; v != "" {
|
||||
annotations[nfdv1alpha1.WorkerVersionAnnotation] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Create labels et al
|
||||
cli, err := m.apihelper.GetClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.refreshNodeFeatures(cli, nodeName, annotations, features.Spec.Labels, &features.Spec.Features); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *nfdMaster) refreshNodeFeatures(cli *kubernetes.Clientset, nodeName string, annotations, labels map[string]string, features *nfdv1alpha1.Features) error {
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
|
||||
crLabels, crTaints := m.processNodeFeatureRule(features)
|
||||
|
||||
// Mix in CR-originated labels
|
||||
for k, v := range crLabels {
|
||||
labels[k] = v
|
||||
}
|
||||
|
||||
labels, extendedResources := filterFeatureLabels(labels, m.args.ExtraLabelNs, m.args.LabelWhiteList.Regexp, m.args.ResourceLabels)
|
||||
|
||||
var taints []corev1.Taint
|
||||
if m.args.EnableTaints {
|
||||
taints = crTaints
|
||||
}
|
||||
|
||||
err := m.updateNodeObject(cli, nodeName, labels, annotations, extendedResources, taints)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to update node %q: %v", nodeName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// setTaints sets node taints and annotations based on the taints passed via
|
||||
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
|
||||
// NFD owned taints and annotations are removed from the node.
|
||||
|
@ -572,7 +676,7 @@ func authorizeClient(c context.Context, checkNodeName bool, nodeName string) err
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *nfdMaster) processNodeFeatureRule(r *pb.SetLabelsRequest) (map[string]string, []corev1.Taint) {
|
||||
func (m *nfdMaster) processNodeFeatureRule(features *nfdv1alpha1.Features) (map[string]string, []corev1.Taint) {
|
||||
if m.nfdController == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -589,9 +693,6 @@ func (m *nfdMaster) processNodeFeatureRule(r *pb.SetLabelsRequest) (map[string]s
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Helper struct for rule processing
|
||||
features := r.GetFeatures()
|
||||
|
||||
// Process all rule CRs
|
||||
for _, spec := range ruleSpecs {
|
||||
switch {
|
||||
|
@ -621,10 +722,10 @@ func (m *nfdMaster) processNodeFeatureRule(r *pb.SetLabelsRequest) (map[string]s
|
|||
return labels, taints
|
||||
}
|
||||
|
||||
// updateNodeFeatures ensures the Kubernetes node object is up to date,
|
||||
// updateNodeObject ensures the Kubernetes node object is up to date,
|
||||
// creating new labels and extended resources where necessary and removing
|
||||
// outdated ones. Also updates the corresponding annotations.
|
||||
func (m *nfdMaster) updateNodeFeatures(cli *kubernetes.Clientset, nodeName string, labels Labels, annotations Annotations, extendedResources ExtendedResources) error {
|
||||
func (m *nfdMaster) updateNodeObject(cli *kubernetes.Clientset, nodeName string, labels Labels, annotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
|
||||
if cli == nil {
|
||||
return fmt.Errorf("no client is passed, client: %v", cli)
|
||||
}
|
||||
|
@ -677,6 +778,12 @@ func (m *nfdMaster) updateNodeFeatures(cli *kubernetes.Clientset, nodeName strin
|
|||
klog.V(1).Infof("no updates to node %q", nodeName)
|
||||
}
|
||||
|
||||
// Set taints
|
||||
err = m.setTaints(cli, taints, node.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue