mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2024-12-15 17:50:49 +00:00
Add optionable arguments to NewWorker
Signed-off-by: Carlos Eduardo Arango Gutierrez <eduardoa@nvidia.com>
This commit is contained in:
parent
283caf2d64
commit
dd55c9fe78
5 changed files with 99 additions and 59 deletions
|
@ -83,7 +83,7 @@ func main() {
|
||||||
|
|
||||||
// Get new NfdWorker instance
|
// Get new NfdWorker instance
|
||||||
args.GrpcHealthPort = GrpcHealthPort
|
args.GrpcHealthPort = GrpcHealthPort
|
||||||
instance, err := worker.NewNfdWorker(args)
|
instance, err := worker.NewNfdWorker(worker.WithArgs(args))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "failed to initialize NfdWorker instance")
|
klog.ErrorS(err, "failed to initialize NfdWorker instance")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
. "github.com/smartystreets/goconvey/convey"
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/vektra/errors"
|
"github.com/vektra/errors"
|
||||||
|
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||||
|
|
||||||
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
|
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
|
||||||
"sigs.k8s.io/node-feature-discovery/pkg/labeler"
|
"sigs.k8s.io/node-feature-discovery/pkg/labeler"
|
||||||
|
@ -97,7 +98,8 @@ func makeFakeFeatures(names []string) (source.FeatureLabels, Labels) {
|
||||||
|
|
||||||
func TestConfigParse(t *testing.T) {
|
func TestConfigParse(t *testing.T) {
|
||||||
Convey("When parsing configuration", t, func() {
|
Convey("When parsing configuration", t, func() {
|
||||||
w, err := NewNfdWorker(&Args{})
|
w, err := NewNfdWorker(WithArgs(&Args{}),
|
||||||
|
WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
worker := w.(*nfdWorker)
|
worker := w.(*nfdWorker)
|
||||||
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}`
|
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}`
|
||||||
|
@ -222,13 +224,13 @@ core:
|
||||||
`)
|
`)
|
||||||
|
|
||||||
noPublish := true
|
noPublish := true
|
||||||
w, err := NewNfdWorker(&Args{
|
w, err := NewNfdWorker(WithArgs(&Args{
|
||||||
ConfigFile: configFile,
|
ConfigFile: configFile,
|
||||||
Overrides: ConfigOverrideArgs{
|
Overrides: ConfigOverrideArgs{
|
||||||
FeatureSources: &utils.StringSliceVal{"fake"},
|
FeatureSources: &utils.StringSliceVal{"fake"},
|
||||||
LabelSources: &utils.StringSliceVal{"fake"},
|
LabelSources: &utils.StringSliceVal{"fake"},
|
||||||
NoPublish: &noPublish},
|
NoPublish: &noPublish},
|
||||||
})
|
}), WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
worker := w.(*nfdWorker)
|
worker := w.(*nfdWorker)
|
||||||
|
|
||||||
|
@ -307,7 +309,8 @@ func TestNewNfdWorker(t *testing.T) {
|
||||||
|
|
||||||
Convey("without any args specified", func() {
|
Convey("without any args specified", func() {
|
||||||
args := &Args{}
|
args := &Args{}
|
||||||
w, err := NewNfdWorker(args)
|
w, err := NewNfdWorker(WithArgs(args),
|
||||||
|
WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
Convey("no error should be returned", func() {
|
Convey("no error should be returned", func() {
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
})
|
})
|
||||||
|
@ -324,7 +327,8 @@ func TestNewNfdWorker(t *testing.T) {
|
||||||
args := &Args{Overrides: ConfigOverrideArgs{
|
args := &Args{Overrides: ConfigOverrideArgs{
|
||||||
LabelSources: &utils.StringSliceVal{"fake"},
|
LabelSources: &utils.StringSliceVal{"fake"},
|
||||||
FeatureSources: &utils.StringSliceVal{"cpu"}}}
|
FeatureSources: &utils.StringSliceVal{"cpu"}}}
|
||||||
w, err := NewNfdWorker(args)
|
w, err := NewNfdWorker(WithArgs(args),
|
||||||
|
WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
Convey("no error should be returned", func() {
|
Convey("no error should be returned", func() {
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
})
|
})
|
||||||
|
@ -373,7 +377,7 @@ func TestCreateFeatureLabels(t *testing.T) {
|
||||||
|
|
||||||
func TestAdvertiseFeatureLabels(t *testing.T) {
|
func TestAdvertiseFeatureLabels(t *testing.T) {
|
||||||
Convey("When advertising labels", t, func() {
|
Convey("When advertising labels", t, func() {
|
||||||
w, err := NewNfdWorker(&Args{})
|
w, err := NewNfdWorker(WithArgs(&Args{}), WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
worker := w.(*nfdWorker)
|
worker := w.(*nfdWorker)
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/validation"
|
"k8s.io/apimachinery/pkg/util/validation"
|
||||||
"k8s.io/client-go/kubernetes"
|
k8sclient "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
klogutils "sigs.k8s.io/node-feature-discovery/pkg/utils/klog"
|
klogutils "sigs.k8s.io/node-feature-discovery/pkg/utils/klog"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
|
@ -131,6 +131,7 @@ type nfdWorker struct {
|
||||||
kubernetesNamespace string
|
kubernetesNamespace string
|
||||||
grpcClient pb.LabelerClient
|
grpcClient pb.LabelerClient
|
||||||
healthServer *grpc.Server
|
healthServer *grpc.Server
|
||||||
|
k8sClient k8sclient.Interface
|
||||||
nfdClient *nfdclient.Clientset
|
nfdClient *nfdclient.Clientset
|
||||||
stop chan struct{} // channel for signaling stop
|
stop chan struct{} // channel for signaling stop
|
||||||
featureSources []source.FeatureSource
|
featureSources []source.FeatureSource
|
||||||
|
@ -143,30 +144,70 @@ type infiniteTicker struct {
|
||||||
*time.Ticker
|
*time.Ticker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NfdWorkerOption sets properties of the NfdWorker instance.
|
||||||
|
type NfdWorkerOption interface {
|
||||||
|
apply(*nfdWorker)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithArgs is used for passing settings from command line arguments.
|
||||||
|
func WithArgs(args *Args) NfdWorkerOption {
|
||||||
|
return &nfdMWorkerOpt{f: func(n *nfdWorker) { n.args = *args }}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithKuberneteClient forces to use the given kubernetes client, without
|
||||||
|
// initializing one from kubeconfig.
|
||||||
|
func WithKubernetesClient(cli k8sclient.Interface) NfdWorkerOption {
|
||||||
|
return &nfdMWorkerOpt{f: func(n *nfdWorker) { n.k8sClient = cli }}
|
||||||
|
}
|
||||||
|
|
||||||
|
type nfdMWorkerOpt struct {
|
||||||
|
f func(*nfdWorker)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *nfdMWorkerOpt) apply(n *nfdWorker) {
|
||||||
|
f.f(n)
|
||||||
|
}
|
||||||
|
|
||||||
// NewNfdWorker creates new NfdWorker instance.
|
// NewNfdWorker creates new NfdWorker instance.
|
||||||
func NewNfdWorker(args *Args) (NfdWorker, error) {
|
func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) {
|
||||||
nfd := &nfdWorker{
|
nfd := &nfdWorker{
|
||||||
args: *args,
|
|
||||||
config: &NFDConfig{},
|
config: &NFDConfig{},
|
||||||
kubernetesNamespace: utils.GetKubernetesNamespace(),
|
kubernetesNamespace: utils.GetKubernetesNamespace(),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
o.apply(nfd)
|
||||||
|
}
|
||||||
|
|
||||||
// Check TLS related args
|
// Check TLS related args
|
||||||
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
|
if nfd.args.CertFile != "" || nfd.args.KeyFile != "" || nfd.args.CaFile != "" {
|
||||||
if args.CertFile == "" {
|
if nfd.args.CertFile == "" {
|
||||||
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
|
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
|
||||||
}
|
}
|
||||||
if args.KeyFile == "" {
|
if nfd.args.KeyFile == "" {
|
||||||
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
|
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
|
||||||
}
|
}
|
||||||
if args.CaFile == "" {
|
if nfd.args.CaFile == "" {
|
||||||
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
|
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if args.ConfigFile != "" {
|
if nfd.args.ConfigFile != "" {
|
||||||
nfd.configFilePath = filepath.Clean(args.ConfigFile)
|
nfd.configFilePath = filepath.Clean(nfd.args.ConfigFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
// k8sClient might've been set via opts by tests
|
||||||
|
if nfd.k8sClient == nil {
|
||||||
|
kubeconfig, err := utils.GetKubeconfig(nfd.args.Kubeconfig)
|
||||||
|
if err != nil {
|
||||||
|
return nfd, err
|
||||||
|
}
|
||||||
|
cli, err := k8sclient.NewForConfig(kubeconfig)
|
||||||
|
if err != nil {
|
||||||
|
return nfd, err
|
||||||
|
}
|
||||||
|
nfd.k8sClient = cli
|
||||||
}
|
}
|
||||||
|
|
||||||
return nfd, nil
|
return nfd, nil
|
||||||
|
@ -273,32 +314,33 @@ func (w *nfdWorker) Run() error {
|
||||||
labelTrigger.Reset(w.config.Core.SleepInterval.Duration)
|
labelTrigger.Reset(w.config.Core.SleepInterval.Duration)
|
||||||
defer labelTrigger.Stop()
|
defer labelTrigger.Stop()
|
||||||
|
|
||||||
|
// Create owner ref
|
||||||
|
ownerReference := []metav1.OwnerReference{}
|
||||||
// Get pod owner reference
|
// Get pod owner reference
|
||||||
podName := os.Getenv("POD_NAME")
|
podName := os.Getenv("POD_NAME")
|
||||||
client, err := w.getKubeClient()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get kube client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
selfPod, err := client.CoreV1().Pods(w.kubernetesNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get pod %q: %w", podName, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create owner ref
|
|
||||||
ownerReference := selfPod.OwnerReferences
|
|
||||||
|
|
||||||
// Add pod owner reference if it exists
|
// Add pod owner reference if it exists
|
||||||
podUID := os.Getenv("POD_UID")
|
if podName != "" {
|
||||||
if podName != "" && podUID != "" {
|
if selfPod, err := w.k8sClient.CoreV1().Pods(w.kubernetesNamespace).Get(context.TODO(), podName, metav1.GetOptions{}); err != nil {
|
||||||
isTrue := true
|
klog.ErrorS(err, "failed to get self pod, cannot inherit ownerReference for NodeFeature")
|
||||||
ownerReference = append(ownerReference, metav1.OwnerReference{
|
return err
|
||||||
APIVersion: "v1",
|
} else {
|
||||||
Kind: "Pod",
|
ownerReference = append(ownerReference, selfPod.OwnerReferences...)
|
||||||
Name: podName,
|
}
|
||||||
UID: types.UID(podUID),
|
|
||||||
Controller: &isTrue,
|
podUID := os.Getenv("POD_UID")
|
||||||
})
|
if podUID != "" {
|
||||||
|
ownerReference = append(ownerReference, metav1.OwnerReference{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: podName,
|
||||||
|
UID: types.UID(podUID),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
klog.InfoS("Cannot append POD ownerReference to NodeFeature, POD_UID not specified")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
klog.InfoS("Cannot set NodeFeature owner references, POD_NAME not specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
w.ownerReference = ownerReference
|
w.ownerReference = ownerReference
|
||||||
|
@ -814,22 +856,6 @@ func (m *nfdWorker) getNfdClient() (*nfdclient.Clientset, error) {
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *nfdWorker) getKubeClient() (*kubernetes.Clientset, error) {
|
|
||||||
// creates the in-cluster config
|
|
||||||
kubeconfig, err := utils.GetKubeconfig(m.args.Kubeconfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// creates the clientset
|
|
||||||
clientset, err := kubernetes.NewForConfig(kubeconfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return clientset, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalJSON implements the Unmarshaler interface from "encoding/json"
|
// UnmarshalJSON implements the Unmarshaler interface from "encoding/json"
|
||||||
func (c *sourcesConfig) UnmarshalJSON(data []byte) error {
|
func (c *sourcesConfig) UnmarshalJSON(data []byte) error {
|
||||||
// First do a raw parse to get the per-source data
|
// First do a raw parse to get the per-source data
|
||||||
|
|
|
@ -90,9 +90,12 @@ func teardownTest(ctx testContext) {
|
||||||
func TestNewNfdWorker(t *testing.T) {
|
func TestNewNfdWorker(t *testing.T) {
|
||||||
Convey("When initializing new NfdWorker instance", t, func() {
|
Convey("When initializing new NfdWorker instance", t, func() {
|
||||||
Convey("When one of -cert-file, -key-file or -ca-file is missing", func() {
|
Convey("When one of -cert-file, -key-file or -ca-file is missing", func() {
|
||||||
_, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"})
|
_, err := worker.NewNfdWorker(worker.WithArgs(&worker.Args{CertFile: "crt", KeyFile: "key"}),
|
||||||
_, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"})
|
worker.WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
_, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"})
|
_, err2 := worker.NewNfdWorker(worker.WithArgs(&worker.Args{KeyFile: "key", CaFile: "ca"}),
|
||||||
|
worker.WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
|
_, err3 := worker.NewNfdWorker(worker.WithArgs(&worker.Args{CertFile: "crt", CaFile: "ca"}),
|
||||||
|
worker.WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
Convey("An error should be returned", func() {
|
Convey("An error should be returned", func() {
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
So(err2, ShouldNotBeNil)
|
So(err2, ShouldNotBeNil)
|
||||||
|
@ -112,7 +115,8 @@ func TestRun(t *testing.T) {
|
||||||
Oneshot: true,
|
Oneshot: true,
|
||||||
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
|
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
|
||||||
}
|
}
|
||||||
fooasdf, _ := worker.NewNfdWorker(args)
|
fooasdf, _ := worker.NewNfdWorker(worker.WithArgs(args),
|
||||||
|
worker.WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
err := fooasdf.Run()
|
err := fooasdf.Run()
|
||||||
Convey("No error should be returned", func() {
|
Convey("No error should be returned", func() {
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
@ -141,7 +145,8 @@ func TestRunTls(t *testing.T) {
|
||||||
Oneshot: true,
|
Oneshot: true,
|
||||||
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
|
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
|
||||||
}
|
}
|
||||||
w, _ := worker.NewNfdWorker(&workerArgs)
|
w, _ := worker.NewNfdWorker(worker.WithArgs(&workerArgs),
|
||||||
|
worker.WithKubernetesClient(fakeclient.NewSimpleClientset()))
|
||||||
err := w.Run()
|
err := w.Run()
|
||||||
Convey("No error should be returned", func() {
|
Convey("No error should be returned", func() {
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
|
@ -224,6 +224,11 @@ func createRoleWorker(ctx context.Context, cs clientset.Interface, ns string) (*
|
||||||
Resources: []string{"nodefeatures"},
|
Resources: []string{"nodefeatures"},
|
||||||
Verbs: []string{"create", "get", "update"},
|
Verbs: []string{"create", "get", "update"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
APIGroups: []string{""},
|
||||||
|
Resources: []string{"pods"},
|
||||||
|
Verbs: []string{"get"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return cs.RbacV1().Roles(ns).Update(ctx, cr, metav1.UpdateOptions{})
|
return cs.RbacV1().Roles(ns).Update(ctx, cr, metav1.UpdateOptions{})
|
||||||
|
|
Loading…
Reference in a new issue