mirror of
synced 2025-03-15 04:57:56 +00:00
Merge pull request #291 from uniemimu/master
master: add extended resource support
This commit is contained in:
8 changed files with 268 additions and 23 deletions
@ -8,6 +8,7 @@
- [Feature discovery](#feature-discovery)
- [Feature sources](#feature-sources)
- [Feature labels](#feature-labels)
- [Extended resources (experimental)](#extended-resources-experimental)
- [Getting started](#getting-started)
- [System requirements](#system-requirements)
- [Usage](#usage)
@ -53,7 +54,7 @@ nfd-master.
nfd-master [--no-publish] [--label-whitelist=<pattern>] [--port=<port>]
[--ca-file=<path>] [--cert-file=<path>] [--key-file=<path>]
[--verify-node-name] [--extra-label-ns=<list>]
[--verify-node-name] [--extra-label-ns=<list>] [--resource-labels=<list>]
nfd-master -h | --help
nfd-master --version
@ -76,6 +77,8 @@ nfd-master.
publish to the Kubernetes API server. [Default: ]
--extra-label-ns=<list> Comma separated list of allowed extra label namespaces
[Default: ]
--resource-labels=<list> Comma separated list of labels to be exposed as extended resources.
[Default: ]
### NFD-Worker
@ -441,6 +444,37 @@ temporary file (outside the `source.d` and `features.d` directories), and,
atomically create/update the original file by doing a filesystem move
## Extended resources (experimental)
This feature is experimental and by no means a replacement for the usage of
device plugins.
Labels which have integer values, can be promoted to Kubernetes extended
resources by listing them to the master `--resource-labels` command line flag.
These labels won't then show in the node label section, they will appear only
as extended resources.
An example use-case for the extended resources could be based on a hook which
creates a label for the node SGX EPC memory section size. By giving the name of
that label in the `--resource-labels` flag, that value will then turn into an
extended resource of the node, allowing PODs to request that resource and the
Kubernetes scheduler to schedule such PODs to only those nodes which have a
sufficient capacity of said resource left.
Similar to labels, the default namespace `feature.node.kubernetes.io` is
automatically prefixed to the extended resource, if the promoted label doesn't
have a namespace.
Example usage of the command line arguments, using a new namespace:
`nfd-master --resource-labels=my_source-my.feature,sgx.some.ns/epc --extra-label-ns=sgx.some.ns`
The above would result in following extended resources provided that related
labels exist:
sgx.some.ns/epc: <label value>
feature.node.kubernetes.io/my_source-my.feature: <label value>
## Getting started
For a stable version with ready-built images see the
@ -65,7 +65,7 @@ func argsParse(argv []string) (master.Args, error) {
%s [--no-publish] [--label-whitelist=<pattern>] [--port=<port>]
[--ca-file=<path>] [--cert-file=<path>] [--key-file=<path>]
[--verify-node-name] [--extra-label-ns=<list>]
[--verify-node-name] [--extra-label-ns=<list>] [--resource-labels=<list>]
%s -h | --help
%s --version
@ -87,6 +87,8 @@ func argsParse(argv []string) (master.Args, error) {
--label-whitelist=<pattern> Regular expression to filter label names to
publish to the Kubernetes API server. [Default: ]
--extra-label-ns=<list> Comma separated list of allowed extra label namespaces
[Default: ]
--resource-labels=<list> Comma separated list of labels to be exposed as extended resources.
[Default: ]`,
@ -113,6 +115,7 @@ func argsParse(argv []string) (master.Args, error) {
args.VerifyNodeName = arguments["--verify-node-name"].(bool)
args.ExtraLabelNs = strings.Split(arguments["--extra-label-ns"].(string), ",")
args.ResourceLabels = strings.Split(arguments["--resource-labels"].(string), ",")
return args, nil
@ -18,6 +18,9 @@ rules:
- ""
- nodes
# when using command line flag --resource-labels to create extended resources
# you will need to uncomment "- nodes/status"
# - nodes/status
- get
- patch
@ -31,4 +31,7 @@ type APIHelpers interface {
// UpdateNode updates the node via the API server using a client.
UpdateNode(*k8sclient.Clientset, *api.Node) error
// PatchStatus updates the node status via the API server using a client.
PatchStatus(*k8sclient.Clientset, string, interface{}) error
@ -17,8 +17,11 @@ limitations under the License.
package apihelper
import (
api "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -59,3 +62,13 @@ func (h K8sHelpers) UpdateNode(c *k8sclient.Clientset, n *api.Node) error {
return nil
func (h K8sHelpers) PatchStatus(c *k8sclient.Clientset, nodeName string, marshalable interface{}) error {
// Send the updated node to the apiserver.
patch, err := json.Marshal(marshalable)
if err == nil {
_, err = c.CoreV1().Nodes().Patch(nodeName, types.JSONPatchType, patch, "status")
return err
@ -59,6 +59,20 @@ func (_m *MockAPIHelpers) GetNode(_a0 *kubernetes.Clientset, _a1 string) (*v1.No
return r0, r1
// PatchStatus provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockAPIHelpers) PatchStatus(_a0 *kubernetes.Clientset, _a1 string, _a2 interface{}) error {
ret := _m.Called(_a0, _a1, _a2)
var r0 error
if rf, ok := ret.Get(0).(func(*kubernetes.Clientset, string, interface{}) error); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Error(0)
return r0
// UpdateNode provides a mock function with given fields: _a0, _a1
func (_m *MockAPIHelpers) UpdateNode(_a0 *kubernetes.Clientset, _a1 *v1.Node) error {
ret := _m.Called(_a0, _a1)
@ -23,9 +23,11 @@ import (
. "github.com/smartystreets/goconvey/convey"
api "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sclient "k8s.io/client-go/kubernetes"
@ -43,15 +45,18 @@ func init() {
func newMockNode() *api.Node {
n := api.Node{}
n.Name = mockNodeName
n.Labels = map[string]string{}
n.Annotations = map[string]string{}
n.Status.Capacity = api.ResourceList{}
return &n
func TestUpdateNodeFeatures(t *testing.T) {
Convey("When I update the node using fake client", t, func() {
fakeFeatureLabels := map[string]string{"source-feature.1": "val1", "source-feature.2": "val2", "source-feature.3": "val3"}
fakeFeatureLabels := map[string]string{"source-feature.1": "1", "source-feature.2": "2", "source-feature.3": "val3"}
fakeAnnotations := map[string]string{"version": version.Get()}
fakeExtResources := ExtendedResources{"source-feature.1": "", "source-feature.2": ""}
fakeFeatureLabelNames := make([]string, 0, len(fakeFeatureLabels))
for k, _ := range fakeFeatureLabels {
fakeFeatureLabelNames = append(fakeFeatureLabelNames, k)
@ -70,7 +75,8 @@ func TestUpdateNodeFeatures(t *testing.T) {
mockAPIHelper.On("GetClient").Return(mockClient, nil)
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(mockNode, nil).Once()
mockAPIHelper.On("UpdateNode", mockClient, mockNode).Return(nil).Once()
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations)
mockAPIHelper.On("PatchStatus", mockClient, mockNodeName, mock.Anything).Return(nil).Twice()
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
Convey("Error is nil", func() {
So(err, ShouldBeNil)
@ -90,7 +96,7 @@ func TestUpdateNodeFeatures(t *testing.T) {
Convey("When I fail to update the node with feature labels", func() {
expectedError := errors.New("fake error")
mockAPIHelper.On("GetClient").Return(nil, expectedError)
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations)
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
Convey("Error is produced", func() {
So(err, ShouldEqual, expectedError)
@ -100,7 +106,7 @@ func TestUpdateNodeFeatures(t *testing.T) {
Convey("When I fail to get a mock client while updating feature labels", func() {
expectedError := errors.New("fake error")
mockAPIHelper.On("GetClient").Return(nil, expectedError)
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations)
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
Convey("Error is produced", func() {
So(err, ShouldEqual, expectedError)
@ -111,7 +117,7 @@ func TestUpdateNodeFeatures(t *testing.T) {
expectedError := errors.New("fake error")
mockAPIHelper.On("GetClient").Return(mockClient, nil)
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(nil, expectedError).Once()
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations)
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
Convey("Error is produced", func() {
So(err, ShouldEqual, expectedError)
@ -123,7 +129,7 @@ func TestUpdateNodeFeatures(t *testing.T) {
mockAPIHelper.On("GetClient").Return(mockClient, nil)
mockAPIHelper.On("GetNode", mockClient, mockNodeName).Return(mockNode, nil).Once()
mockAPIHelper.On("UpdateNode", mockClient, mockNode).Return(expectedError).Once()
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations)
err := updateNodeFeatures(mockAPIHelper, mockNodeName, fakeFeatureLabels, fakeAnnotations, fakeExtResources)
Convey("Error is produced", func() {
So(err, ShouldEqual, expectedError)
@ -178,6 +184,72 @@ func TestUpdateMasterNode(t *testing.T) {
func TestAddingExtResources(t *testing.T) {
Convey("When adding extended resources", t, func() {
Convey("When there are no matching labels", func() {
mockNode := newMockNode()
mockResourceLabels := ExtendedResources{}
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldEqual, 0)
Convey("When there are matching labels", func() {
mockNode := newMockNode()
mockResourceLabels := ExtendedResources{"feature-1": "1", "feature-2": "2"}
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldBeGreaterThan, 0)
Convey("When the resource already exists", func() {
mockNode := newMockNode()
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-1")] = *resource.NewQuantity(1, resource.BinarySI)
mockResourceLabels := ExtendedResources{"feature-1": "1"}
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldEqual, 0)
Convey("When the resource already exists but its capacity has changed", func() {
mockNode := newMockNode()
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-1")] = *resource.NewQuantity(2, resource.BinarySI)
mockResourceLabels := ExtendedResources{"feature-1": "1"}
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldBeGreaterThan, 0)
func TestRemovingExtResources(t *testing.T) {
Convey("When removing extended resources", t, func() {
Convey("When none are removed", func() {
mockNode := newMockNode()
mockResourceLabels := ExtendedResources{"feature-1": "1", "feature-2": "2"}
mockNode.Annotations[AnnotationNs+"extended-resources"] = "feature-1,feature-2"
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-1")] = *resource.NewQuantity(1, resource.BinarySI)
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-2")] = *resource.NewQuantity(2, resource.BinarySI)
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldEqual, 0)
Convey("When the related label is gone", func() {
mockNode := newMockNode()
mockResourceLabels := ExtendedResources{"feature-4": "", "feature-2": "2"}
mockNode.Annotations[AnnotationNs+"extended-resources"] = "feature-4,feature-2"
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-4")] = *resource.NewQuantity(4, resource.BinarySI)
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-2")] = *resource.NewQuantity(2, resource.BinarySI)
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldBeGreaterThan, 0)
Convey("When the extended resource is no longer wanted", func() {
mockNode := newMockNode()
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-1")] = *resource.NewQuantity(1, resource.BinarySI)
mockNode.Status.Capacity[api.ResourceName(LabelNs+"feature-2")] = *resource.NewQuantity(2, resource.BinarySI)
mockResourceLabels := ExtendedResources{"feature-2": "2"}
mockNode.Annotations[AnnotationNs+"extended-resources"] = "feature-1,feature-2"
resourceOps := getExtendedResourceOps(mockNode, mockResourceLabels)
So(len(resourceOps), ShouldBeGreaterThan, 0)
func TestSetLabels(t *testing.T) {
Convey("When servicing SetLabels request", t, func() {
const workerName = "mock-worker"
@ -197,6 +269,7 @@ func TestSetLabels(t *testing.T) {
expectedAnnotations := map[string]string{"worker.version": workerVer}
expectedAnnotations["feature-labels"] = strings.Join(mockLabelNames, ",")
expectedAnnotations["extended-resources"] = ""
Convey("When node update succeeds", func() {
mockHelper.On("GetClient").Return(mockClient, nil)
@ -231,7 +304,7 @@ func TestSetLabels(t *testing.T) {
So(len(mockNode.Labels), ShouldEqual, 1)
So(mockNode.Labels, ShouldResemble, map[string]string{LabelNs + "feature-2": "val-2"})
a := map[string]string{AnnotationNs + "worker.version": workerVer, AnnotationNs + "feature-labels": "feature-2"}
a := map[string]string{AnnotationNs + "worker.version": workerVer, AnnotationNs + "feature-labels": "feature-2", AnnotationNs + "extended-resources": ""}
So(len(mockNode.Annotations), ShouldEqual, len(a))
So(mockNode.Annotations, ShouldResemble, a)
@ -254,7 +327,7 @@ func TestSetLabels(t *testing.T) {
So(len(mockNode.Labels), ShouldEqual, 2)
So(mockNode.Labels, ShouldResemble, map[string]string{LabelNs + "feature-1": "val-1", "valid.ns/feature-2": "val-2"})
a := map[string]string{AnnotationNs + "worker.version": workerVer, AnnotationNs + "feature-labels": "feature-1,valid.ns/feature-2"}
a := map[string]string{AnnotationNs + "worker.version": workerVer, AnnotationNs + "feature-labels": "feature-1,valid.ns/feature-2", AnnotationNs + "extended-resources": ""}
So(len(mockNode.Annotations), ShouldEqual, len(a))
So(mockNode.Annotations, ShouldResemble, a)
@ -26,6 +26,7 @@ import (
@ -57,6 +58,9 @@ var (
// Labels are a Kubernetes representation of discovered features.
type Labels map[string]string
// ExtendedResources are k8s extended resources which are created from discovered features.
type ExtendedResources map[string]string
// Annotations are used for NFD-related node metadata
type Annotations map[string]string
@ -70,6 +74,7 @@ type Args struct {
NoPublish bool
Port int
VerifyNodeName bool
ResourceLabels []string
type NfdMaster interface {
@ -84,6 +89,21 @@ type nfdMaster struct {
ready chan bool
// statusOp is a json marshaling helper used for patching node status
type statusOp struct {
Op string `json:"op"`
Path string `json:"path"`
Value string `json:"value,omitempty"`
func createStatusOp(verb string, resource string, path string, value string) statusOp {
if !strings.Contains(resource, "/") {
resource = LabelNs + resource
res := strings.ReplaceAll(resource, "/", "~1")
return statusOp{verb, "/status/" + path + "/" + res, value}
// Create new NfdMaster server instance.
func NewNfdMaster(args Args) (*nfdMaster, error) {
nfd := &nfdMaster{args: args, ready: make(chan bool, 1)}
@ -204,7 +224,7 @@ func updateMasterNode(helper apihelper.APIHelpers) error {
// Filter labels by namespace and name whitelist
func filterFeatureLabels(labels Labels, extraLabelNs []string, labelWhiteList *regexp.Regexp) Labels {
func filterFeatureLabels(labels Labels, extraLabelNs []string, labelWhiteList *regexp.Regexp, extendedResourceNames []string) (Labels, ExtendedResources) {
for label := range labels {
split := strings.SplitN(label, "/", 2)
name := split[0]
@ -229,7 +249,24 @@ func filterFeatureLabels(labels Labels, extraLabelNs []string, labelWhiteList *r
delete(labels, label)
return labels
// Remove labels which are intended to be extended resources
extendedResources := ExtendedResources{}
for _, extendedResourceName := range extendedResourceNames {
// remove possibly given default LabelNs to keep annotations shorter
extendedResourceName = strings.TrimPrefix(extendedResourceName, LabelNs)
if _, ok := labels[extendedResourceName]; ok {
if _, err := strconv.Atoi(labels[extendedResourceName]); err != nil {
stderrLogger.Printf("bad label value encountered for extended resource: %s", err.Error())
continue // non-numeric label can't be used
extendedResources[extendedResourceName] = labels[extendedResourceName]
delete(labels, extendedResourceName)
return labels, extendedResources
// Implement LabelerServer
@ -265,19 +302,28 @@ func (s *labelerServer) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*p
stdoutLogger.Printf("REQUEST Node: %s NFD-version: %s Labels: %s", r.NodeName, r.NfdVersion, r.Labels)
labels := filterFeatureLabels(r.Labels, s.args.ExtraLabelNs, s.args.LabelWhiteList)
labels, extendedResources := filterFeatureLabels(r.Labels, s.args.ExtraLabelNs, s.args.LabelWhiteList, s.args.ResourceLabels)
if !s.args.NoPublish {
// Advertise NFD worker version and label names as annotations
keys := make([]string, 0, len(labels))
for k, _ := range labels {
keys = append(keys, k)
// Advertise NFD worker version, label names and extended resources as annotations
labelKeys := make([]string, 0, len(labels))
for k := range labels {
labelKeys = append(labelKeys, k)
annotations := Annotations{"worker.version": r.NfdVersion,
"feature-labels": strings.Join(keys, ",")}
err := updateNodeFeatures(s.apiHelper, r.NodeName, labels, annotations)
extendedResourceKeys := make([]string, 0, len(extendedResources))
for key := range extendedResources {
extendedResourceKeys = append(extendedResourceKeys, key)
annotations := Annotations{"worker.version": r.NfdVersion,
"feature-labels": strings.Join(labelKeys, ","),
"extended-resources": strings.Join(extendedResourceKeys, ","),
err := updateNodeFeatures(s.apiHelper, r.NodeName, labels, annotations, extendedResources)
if err != nil {
stderrLogger.Printf("failed to advertise labels: %s", err.Error())
return &pb.SetLabelsReply{}, err
@ -288,7 +334,7 @@ func (s *labelerServer) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*p
// advertiseFeatureLabels advertises the feature labels to a Kubernetes node
// via the API server.
func updateNodeFeatures(helper apihelper.APIHelpers, nodeName string, labels Labels, annotations Annotations) error {
func updateNodeFeatures(helper apihelper.APIHelpers, nodeName string, labels Labels, annotations Annotations, extendedResources ExtendedResources) error {
cli, err := helper.GetClient()
if err != nil {
return err
@ -300,6 +346,9 @@ func updateNodeFeatures(helper apihelper.APIHelpers, nodeName string, labels Lab
return err
// Resolve publishable extended resources before node is modified
statusOps := getExtendedResourceOps(node, extendedResources)
// Remove old labels
if l, ok := node.Annotations[AnnotationNs+"feature-labels"]; ok {
oldLabels := strings.Split(l, ",")
@ -323,7 +372,16 @@ func updateNodeFeatures(helper apihelper.APIHelpers, nodeName string, labels Lab
return err
return nil
// patch node status with extended resource changes
if len(statusOps) > 0 {
err = helper.PatchStatus(cli, node.Name, statusOps)
if err != nil {
stderrLogger.Printf("error while patching extended resources: %s", err.Error())
return err
return err
// Remove any labels having the given prefix
@ -346,6 +404,42 @@ func removeLabels(n *api.Node, labelNames []string) {
// getExtendedResourceOps returns a slice of operations to perform on the node status
func getExtendedResourceOps(n *api.Node, extendedResources ExtendedResources) []statusOp {
var statusOps []statusOp
oldResources := strings.Split(n.Annotations[AnnotationNs+"extended-resources"], ",")
// figure out which resources to remove
for _, resource := range oldResources {
if _, ok := n.Status.Capacity[api.ResourceName(addNs(resource, LabelNs))]; ok {
// check if the ext resource is still needed
_, extResNeeded := extendedResources[resource]
if !extResNeeded {
statusOps = append(statusOps, createStatusOp("remove", resource, "capacity", ""))
statusOps = append(statusOps, createStatusOp("remove", resource, "allocatable", ""))
// figure out which resources to replace and which to add
for resource, value := range extendedResources {
// check if the extended resource already exists with the same capacity in the node
if quantity, ok := n.Status.Capacity[api.ResourceName(addNs(resource, LabelNs))]; ok {
val, _ := quantity.AsInt64()
if strconv.FormatInt(val, 10) != value {
statusOps = append(statusOps, createStatusOp("replace", resource, "capacity", value))
statusOps = append(statusOps, createStatusOp("replace", resource, "allocatable", value))
} else {
statusOps = append(statusOps, createStatusOp("add", resource, "capacity", value))
// "allocatable" gets added implicitly after adding to capacity
return statusOps
// Add NFD labels to a Node object.
func addLabels(n *api.Node, labels map[string]string) {
for k, v := range labels {
@ -363,3 +457,11 @@ func addAnnotations(n *api.Node, annotations map[string]string) {
n.Annotations[AnnotationNs+k] = v
// addNs adds a namespace if one isn't already found from src string
func addNs(src string, nsToAdd string) string {
if strings.Contains(src, "/") {
return src
return nsToAdd + src
Add table
Reference in a new issue