From 1026d91d12a35664059dc5c1ef410a74d31b0af6 Mon Sep 17 00:00:00 2001 From: Markus Lehtonen Date: Fri, 23 Dec 2022 10:45:07 +0200 Subject: [PATCH] worker: move code Simplify code bu dropping the unnecessary base client package. --- cmd/nfd-worker/main.go | 2 +- pkg/nfd-client/base.go | 136 ------------------ .../nfd-worker-internal_test.go | 2 +- .../worker => nfd-worker}/nfd-worker.go | 109 ++++++++++---- .../worker => nfd-worker}/nfd-worker_test.go | 30 ++-- 5 files changed, 100 insertions(+), 179 deletions(-) delete mode 100644 pkg/nfd-client/base.go rename pkg/{nfd-client/worker => nfd-worker}/nfd-worker-internal_test.go (99%) rename pkg/{nfd-client/worker => nfd-worker}/nfd-worker.go (86%) rename pkg/{nfd-client/worker => nfd-worker}/nfd-worker_test.go (78%) diff --git a/cmd/nfd-worker/main.go b/cmd/nfd-worker/main.go index 30e3290d1..2e6675e22 100644 --- a/cmd/nfd-worker/main.go +++ b/cmd/nfd-worker/main.go @@ -24,7 +24,7 @@ import ( "k8s.io/klog/v2" - "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker" + worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/version" ) diff --git a/pkg/nfd-client/base.go b/pkg/nfd-client/base.go deleted file mode 100644 index e8f53ed05..000000000 --- a/pkg/nfd-client/base.go +++ /dev/null @@ -1,136 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package nfdclient - -import ( - "crypto/tls" - "crypto/x509" - "fmt" - "os" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "k8s.io/klog/v2" - - "sigs.k8s.io/node-feature-discovery/pkg/utils" -) - -// NfdClient defines a common interface for NFD clients. -type NfdClient interface { - Run() error - Stop() -} - -// NfdBaseClient is a common base type for handling connections to nfd-master. -type NfdBaseClient struct { - args Args - clientConn *grpc.ClientConn -} - -// Args holds the common command line arguments for all nfd clients. -type Args struct { - CaFile string - CertFile string - KeyFile string - Kubeconfig string - Server string - ServerNameOverride string - - Klog map[string]*utils.KlogFlagVal -} - -// NewNfdBaseClient creates a new NfdBaseClient instance. -func NewNfdBaseClient(args *Args) (NfdBaseClient, error) { - nfd := NfdBaseClient{args: *args} - - // Check TLS related args - if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" { - if args.CertFile == "" { - return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file") - } - if args.KeyFile == "" { - return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file") - } - if args.CaFile == "" { - return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file") - } - } - - return nfd, nil -} - -// ClientConn returns the grpc ClientConn object. -func (w *NfdBaseClient) ClientConn() *grpc.ClientConn { return w.clientConn } - -// Connect creates a gRPC client connection to nfd-master. -func (w *NfdBaseClient) Connect() error { - // Check that if a connection already exists - if w.clientConn != nil { - return fmt.Errorf("client connection already exists") - } - - // Dial and create a client - dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - dialOpts := []grpc.DialOption{grpc.WithBlock()} - if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" { - // Load client cert for client authentication - cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile) - if err != nil { - return fmt.Errorf("failed to load client certificate: %v", err) - } - // Load CA cert for server cert verification - caCert, err := os.ReadFile(w.args.CaFile) - if err != nil { - return fmt.Errorf("failed to read root certificate file: %v", err) - } - caPool := x509.NewCertPool() - if ok := caPool.AppendCertsFromPEM(caCert); !ok { - return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile) - } - // Create TLS config - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caPool, - ServerName: w.args.ServerNameOverride, - MinVersion: tls.VersionTLS13, - } - dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - } else { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } - klog.Infof("connecting to nfd-master at %s ...", w.args.Server) - conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...) - if err != nil { - return err - } - w.clientConn = conn - - return nil -} - -// Disconnect closes the connection to NFD master -func (w *NfdBaseClient) Disconnect() { - if w.clientConn != nil { - klog.Infof("closing connection to nfd-master ...") - w.clientConn.Close() - } - w.clientConn = nil -} diff --git a/pkg/nfd-client/worker/nfd-worker-internal_test.go b/pkg/nfd-worker/nfd-worker-internal_test.go similarity index 99% rename from pkg/nfd-client/worker/nfd-worker-internal_test.go rename to pkg/nfd-worker/nfd-worker-internal_test.go index 714197712..94852ee3a 100644 --- a/pkg/nfd-client/worker/nfd-worker-internal_test.go +++ b/pkg/nfd-worker/nfd-worker-internal_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package worker +package nfdworker import ( "os" diff --git a/pkg/nfd-client/worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go similarity index 86% rename from pkg/nfd-client/worker/nfd-worker.go rename to pkg/nfd-worker/nfd-worker.go index 1f7443cf0..304eeb698 100644 --- a/pkg/nfd-client/worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -14,9 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package worker +package nfdworker import ( + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "os" @@ -27,6 +29,9 @@ import ( "time" "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/klog/v2" @@ -38,7 +43,6 @@ import ( nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1" nfdclient "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned" pb "sigs.k8s.io/node-feature-discovery/pkg/labeler" - clientcommon "sigs.k8s.io/node-feature-discovery/pkg/nfd-client" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/source" @@ -57,6 +61,12 @@ import ( _ "sigs.k8s.io/node-feature-discovery/source/usb" ) +// NfdWorker is the interface for nfd-worker daemon +type NfdWorker interface { + Run() error + Stop() +} + // NFDConfig contains the configuration settings of NfdWorker. type NFDConfig struct { Core coreConfig @@ -80,14 +90,18 @@ type Labels map[string]string // Args are the command line arguments of NfdWorker. type Args struct { - clientcommon.Args - + CaFile string + CertFile string ConfigFile string EnableNodeFeatureApi bool + KeyFile string + Klog map[string]*utils.KlogFlagVal + Kubeconfig string Oneshot bool Options string + Server string + ServerNameOverride string - Klog map[string]*utils.KlogFlagVal Overrides ConfigOverrideArgs } @@ -100,10 +114,9 @@ type ConfigOverrideArgs struct { } type nfdWorker struct { - clientcommon.NfdBaseClient - args Args certWatch *utils.FsWatcher + clientConn *grpc.ClientConn configFilePath string config *NFDConfig kubernetesNamespace string @@ -119,21 +132,27 @@ type duration struct { } // NewNfdWorker creates new NfdWorker instance. -func NewNfdWorker(args *Args) (clientcommon.NfdClient, error) { - base, err := clientcommon.NewNfdBaseClient(&args.Args) - if err != nil { - return nil, err - } - +func NewNfdWorker(args *Args) (NfdWorker, error) { nfd := &nfdWorker{ - NfdBaseClient: base, - args: *args, config: &NFDConfig{}, kubernetesNamespace: utils.GetKubernetesNamespace(), stop: make(chan struct{}, 1), } + // Check TLS related args + if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" { + if args.CertFile == "" { + return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file") + } + if args.KeyFile == "" { + return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file") + } + if args.CaFile == "" { + return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file") + } + } + if args.ConfigFile != "" { nfd.configFilePath = filepath.Clean(args.ConfigFile) } @@ -175,7 +194,7 @@ func (w *nfdWorker) Run() error { return err } - defer w.GrpcDisconnect() + defer w.grpcDisconnect() labelTrigger := time.After(0) for { @@ -214,7 +233,7 @@ func (w *nfdWorker) Run() error { } // Manage connection to master if w.config.Core.NoPublish || !w.args.EnableNodeFeatureApi { - w.GrpcDisconnect() + w.grpcDisconnect() } // Always re-label after a re-config event. This way the new config @@ -223,7 +242,7 @@ func (w *nfdWorker) Run() error { case <-w.certWatch.Events: klog.Infof("TLS certificate update, renewing connection to nfd-master") - w.GrpcDisconnect() + w.grpcDisconnect() case <-w.stop: klog.Infof("shutting down nfd-worker") @@ -249,18 +268,60 @@ func (w *nfdWorker) getGrpcClient() (pb.LabelerClient, error) { return w.grpcClient, nil } - if err := w.NfdBaseClient.Connect(); err != nil { - return nil, err + // Check that if a connection already exists + if w.clientConn != nil { + return nil, fmt.Errorf("client connection already exists") } - w.grpcClient = pb.NewLabelerClient(w.ClientConn()) + // Dial and create a client + dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + dialOpts := []grpc.DialOption{grpc.WithBlock()} + if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" { + // Load client cert for client authentication + cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile) + if err != nil { + return nil, fmt.Errorf("failed to load client certificate: %v", err) + } + // Load CA cert for server cert verification + caCert, err := os.ReadFile(w.args.CaFile) + if err != nil { + return nil, fmt.Errorf("failed to read root certificate file: %v", err) + } + caPool := x509.NewCertPool() + if ok := caPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile) + } + // Create TLS config + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caPool, + ServerName: w.args.ServerNameOverride, + MinVersion: tls.VersionTLS13, + } + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + } else { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + klog.Infof("connecting to nfd-master at %s ...", w.args.Server) + conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...) + if err != nil { + return nil, err + } + w.clientConn = conn + + w.grpcClient = pb.NewLabelerClient(w.clientConn) return w.grpcClient, nil } -// GrpcDisconnect closes the gRPC connection to NFD master -func (w *nfdWorker) GrpcDisconnect() { - w.NfdBaseClient.Disconnect() +// grpcDisconnect closes the gRPC connection to NFD master +func (w *nfdWorker) grpcDisconnect() { + if w.clientConn != nil { + klog.Infof("closing connection to nfd-master ...") + w.clientConn.Close() + } + w.clientConn = nil w.grpcClient = nil } func (c *coreConfig) sanitize() { diff --git a/pkg/nfd-client/worker/nfd-worker_test.go b/pkg/nfd-worker/nfd-worker_test.go similarity index 78% rename from pkg/nfd-client/worker/nfd-worker_test.go rename to pkg/nfd-worker/nfd-worker_test.go index adf2b84f1..b80aa27b3 100644 --- a/pkg/nfd-client/worker/nfd-worker_test.go +++ b/pkg/nfd-worker/nfd-worker_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package worker_test +package nfdworker_test import ( "fmt" @@ -25,9 +25,8 @@ import ( . "github.com/smartystreets/goconvey/convey" - nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client" - "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker" master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master" + worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/test/data" ) @@ -76,9 +75,9 @@ func teardownTest(ctx testContext) { func TestNewNfdWorker(t *testing.T) { Convey("When initializing new NfdWorker instance", t, func() { Convey("When one of -cert-file, -key-file or -ca-file is missing", func() { - _, err := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}}) - _, err2 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}}) - _, err3 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}}) + _, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"}) + _, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"}) + _, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"}) Convey("An error should be returned", func() { So(err, ShouldNotBeNil) So(err2, ShouldNotBeNil) @@ -94,8 +93,7 @@ func TestRun(t *testing.T) { Convey("When running nfd-worker against nfd-master", t, func() { Convey("When publishing features from fake source", func() { args := &worker.Args{ - Args: nfdclient.Args{ - Server: "localhost:8192"}, + Server: "localhost:8192", Oneshot: true, Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, } @@ -120,15 +118,13 @@ func TestRunTls(t *testing.T) { Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() { Convey("When publishing features from fake source", func() { workerArgs := worker.Args{ - Args: nfdclient.Args{ - CaFile: data.FilePath("ca.crt"), - CertFile: data.FilePath("nfd-test-worker.crt"), - KeyFile: data.FilePath("nfd-test-worker.key"), - Server: "localhost:8192", - ServerNameOverride: "nfd-test-master", - }, - Oneshot: true, - Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, + CaFile: data.FilePath("ca.crt"), + CertFile: data.FilePath("nfd-test-worker.crt"), + KeyFile: data.FilePath("nfd-test-worker.key"), + Server: "localhost:8192", + ServerNameOverride: "nfd-test-master", + Oneshot: true, + Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, } w, _ := worker.NewNfdWorker(&workerArgs) err := w.Run()