From 112744bc509cad943b17120fedc2ee7ad03d1e2f Mon Sep 17 00:00:00 2001 From: Markus Lehtonen Date: Thu, 8 Jul 2021 11:02:39 +0300 Subject: [PATCH] nfd-worker: split out gRPC connection handling Refactor the worker code and split out gRPC client connection handling into a separate base type. The intent is to promote re-usability of code for other NFD clients, too. --- cmd/nfd-worker/main.go | 2 +- pkg/nfd-client/base.go | 139 ++++++++++++++++++ .../worker}/nfd-worker-internal_test.go | 2 +- .../worker}/nfd-worker.go | 129 +++++----------- .../worker}/nfd-worker_test.go | 30 ++-- 5 files changed, 192 insertions(+), 110 deletions(-) create mode 100644 pkg/nfd-client/base.go rename pkg/{nfd-worker => nfd-client/worker}/nfd-worker-internal_test.go (99%) rename pkg/{nfd-worker => nfd-client/worker}/nfd-worker.go (81%) rename pkg/{nfd-worker => nfd-client/worker}/nfd-worker_test.go (78%) diff --git a/cmd/nfd-worker/main.go b/cmd/nfd-worker/main.go index 1ba9b2360..9eae719a0 100644 --- a/cmd/nfd-worker/main.go +++ b/cmd/nfd-worker/main.go @@ -24,7 +24,7 @@ import ( "k8s.io/klog/v2" - worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker" + "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/version" ) diff --git a/pkg/nfd-client/base.go b/pkg/nfd-client/base.go new file mode 100644 index 000000000..40e8da4d8 --- /dev/null +++ b/pkg/nfd-client/base.go @@ -0,0 +1,139 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nfdclient + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "os" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "k8s.io/klog/v2" + + "sigs.k8s.io/node-feature-discovery/pkg/utils" +) + +// NfdClient defines a common interface for NFD clients. +type NfdClient interface { + Run() error + Stop() +} + +// NfdBaseClient is a common base type for handling connections to nfd-master. +type NfdBaseClient struct { + args Args + clientConn *grpc.ClientConn +} + +// Args holds the common command line arguments for all nfd clients. +type Args struct { + CaFile string + CertFile string + KeyFile string + Server string + ServerNameOverride string + + Klog map[string]*utils.KlogFlagVal +} + +var nodeName = os.Getenv("NODE_NAME") + +// NodeName returns the name of the k8s node we're running on. +func NodeName() string { return nodeName } + +// Create new NfdWorker instance. +func NewNfdBaseClient(args *Args) (NfdBaseClient, error) { + nfd := NfdBaseClient{args: *args} + + // Check TLS related args + if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" { + if args.CertFile == "" { + return nfd, fmt.Errorf("--cert-file needs to be specified alongside --key-file and --ca-file") + } + if args.KeyFile == "" { + return nfd, fmt.Errorf("--key-file needs to be specified alongside --cert-file and --ca-file") + } + if args.CaFile == "" { + return nfd, fmt.Errorf("--ca-file needs to be specified alongside --cert-file and --key-file") + } + } + + return nfd, nil +} + +// ClientConn returns the grpc ClientConn object. +func (w *NfdBaseClient) ClientConn() *grpc.ClientConn { return w.clientConn } + +// Connect creates a gRPC client connection to nfd-master. +func (w *NfdBaseClient) Connect() error { + // Check that if a connection already exists + if w.clientConn != nil { + return fmt.Errorf("client connection already exists") + } + + // Dial and create a client + dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + dialOpts := []grpc.DialOption{grpc.WithBlock()} + if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" { + // Load client cert for client authentication + cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile) + if err != nil { + return fmt.Errorf("failed to load client certificate: %v", err) + } + // Load CA cert for server cert verification + caCert, err := ioutil.ReadFile(w.args.CaFile) + if err != nil { + return fmt.Errorf("failed to read root certificate file: %v", err) + } + caPool := x509.NewCertPool() + if ok := caPool.AppendCertsFromPEM(caCert); !ok { + return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile) + } + // Create TLS config + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caPool, + ServerName: w.args.ServerNameOverride, + } + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + } else { + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + klog.Infof("connecting to nfd-master at %s ...", w.args.Server) + conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...) + if err != nil { + return err + } + w.clientConn = conn + + return nil +} + +// disconnect closes the connection to NFD master +func (w *NfdBaseClient) Disconnect() { + if w.clientConn != nil { + klog.Infof("closing connection to nfd-master ...") + w.clientConn.Close() + } + w.clientConn = nil +} diff --git a/pkg/nfd-worker/nfd-worker-internal_test.go b/pkg/nfd-client/worker/nfd-worker-internal_test.go similarity index 99% rename from pkg/nfd-worker/nfd-worker-internal_test.go rename to pkg/nfd-client/worker/nfd-worker-internal_test.go index 2d2b13a6b..4bc3f2091 100644 --- a/pkg/nfd-worker/nfd-worker-internal_test.go +++ b/pkg/nfd-client/worker/nfd-worker-internal_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nfdworker +package worker import ( "io/ioutil" diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-client/worker/nfd-worker.go similarity index 81% rename from pkg/nfd-worker/nfd-worker.go rename to pkg/nfd-client/worker/nfd-worker.go index 298d4a6a1..a1888f4f2 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-client/worker/nfd-worker.go @@ -14,11 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nfdworker +package worker import ( - "crypto/tls" - "crypto/x509" "encoding/json" "fmt" "io/ioutil" @@ -29,13 +27,12 @@ import ( "time" "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/klog/v2" "sigs.k8s.io/yaml" pb "sigs.k8s.io/node-feature-discovery/pkg/labeler" + nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/source" @@ -53,10 +50,6 @@ import ( "sigs.k8s.io/node-feature-discovery/source/usb" ) -var ( - nodeName = os.Getenv("NODE_NAME") -) - // Global config type NFDConfig struct { Core coreConfig @@ -78,14 +71,11 @@ type Labels map[string]string // Command line arguments type Args struct { - CaFile string - CertFile string - KeyFile string - ConfigFile string - Options string - Oneshot bool - Server string - ServerNameOverride string + nfdclient.Args + + ConfigFile string + Oneshot bool + Options string Klog map[string]*utils.KlogFlagVal Overrides ConfigOverrideArgs @@ -101,15 +91,11 @@ type ConfigOverrideArgs struct { Sources *utils.StringSliceVal } -type NfdWorker interface { - Run() error - Stop() -} - type nfdWorker struct { + nfdclient.NfdBaseClient + args Args certWatch *utils.FsWatcher - clientConn *grpc.ClientConn client pb.LabelerClient configFilePath string config *NFDConfig @@ -124,8 +110,15 @@ type duration struct { } // Create new NfdWorker instance. -func NewNfdWorker(args *Args) (NfdWorker, error) { +func NewNfdWorker(args *Args) (nfdclient.NfdClient, error) { + base, err := nfdclient.NewNfdBaseClient(&args.Args) + if err != nil { + return nil, err + } + nfd := &nfdWorker{ + NfdBaseClient: base, + args: *args, config: &NFDConfig{}, realSources: []source.FeatureSource{ @@ -153,19 +146,6 @@ func NewNfdWorker(args *Args) (NfdWorker, error) { nfd.configFilePath = filepath.Clean(args.ConfigFile) } - // Check TLS related args - if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" { - if args.CertFile == "" { - return nfd, fmt.Errorf("--cert-file needs to be specified alongside --key-file and --ca-file") - } - if args.KeyFile == "" { - return nfd, fmt.Errorf("--key-file needs to be specified alongside --cert-file and --ca-file") - } - if args.CaFile == "" { - return nfd, fmt.Errorf("--ca-file needs to be specified alongside --cert-file and --key-file") - } - } - return nfd, nil } @@ -184,7 +164,7 @@ func newDefaultConfig() *NFDConfig { // one request if OneShot is set to 'true' in the worker args. func (w *nfdWorker) Run() error { klog.Infof("Node Feature Discovery Worker %s", version.Get()) - klog.Infof("NodeName: '%s'", nodeName) + klog.Infof("NodeName: '%s'", nfdclient.NodeName()) // Create watcher for config file and read initial configuration configWatch, err := utils.CreateFsWatcher(time.Second, w.configFilePath) @@ -202,11 +182,11 @@ func (w *nfdWorker) Run() error { } // Connect to NFD master - err = w.connect() + err = w.Connect() if err != nil { return fmt.Errorf("failed to connect: %v", err) } - defer w.disconnect() + defer w.Disconnect() labelTrigger := time.After(0) for { @@ -238,9 +218,9 @@ func (w *nfdWorker) Run() error { } // Manage connection to master if w.config.Core.NoPublish { - w.disconnect() - } else if w.clientConn == nil { - if err := w.connect(); err != nil { + w.Disconnect() + } else if w.ClientConn() == nil { + if err := w.Connect(); err != nil { return err } } @@ -250,8 +230,8 @@ func (w *nfdWorker) Run() error { case <-w.certWatch.Events: klog.Infof("TLS certificate update, renewing connection to nfd-master") - w.disconnect() - if err := w.connect(); err != nil { + w.Disconnect() + if err := w.Connect(); err != nil { return err } @@ -272,68 +252,27 @@ func (w *nfdWorker) Stop() { } } -// connect creates a client connection to the NFD master -func (w *nfdWorker) connect() error { +// Connect creates a client connection to the NFD master +func (w *nfdWorker) Connect() error { // Return a dummy connection in case of dry-run if w.config.Core.NoPublish { return nil } - // Check that if a connection already exists - if w.clientConn != nil { - return fmt.Errorf("client connection already exists") - } - - // Dial and create a client - dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - dialOpts := []grpc.DialOption{grpc.WithBlock()} - if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" { - // Load client cert for client authentication - cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile) - if err != nil { - return fmt.Errorf("failed to load client certificate: %v", err) - } - // Load CA cert for server cert verification - caCert, err := ioutil.ReadFile(w.args.CaFile) - if err != nil { - return fmt.Errorf("failed to read root certificate file: %v", err) - } - caPool := x509.NewCertPool() - if ok := caPool.AppendCertsFromPEM(caCert); !ok { - return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile) - } - // Create TLS config - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caPool, - ServerName: w.args.ServerNameOverride, - } - dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - } else { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - klog.Infof("connecting to nfd-master at %s ...", w.args.Server) - conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...) - if err != nil { + if err := w.NfdBaseClient.Connect(); err != nil { return err } - w.clientConn = conn - w.client = pb.NewLabelerClient(conn) + + w.client = pb.NewLabelerClient(w.ClientConn()) return nil } -// disconnect closes the connection to NFD master -func (w *nfdWorker) disconnect() { - if w.clientConn != nil { - klog.Infof("closing connection to nfd-master ...") - w.clientConn.Close() - } - w.clientConn = nil +// Disconnect closes the connection to NFD master +func (w *nfdWorker) Disconnect() { + w.NfdBaseClient.Disconnect() w.client = nil } - func (c *coreConfig) sanitize() { if c.SleepInterval.Duration > 0 && c.SleepInterval.Duration < time.Second { klog.Warningf("too short sleep-intervall specified (%s), forcing to 1s", @@ -551,7 +490,7 @@ func advertiseFeatureLabels(client pb.LabelerClient, labels Labels) error { labelReq := pb.SetLabelsRequest{Labels: labels, NfdVersion: version.Get(), - NodeName: nodeName} + NodeName: nfdclient.NodeName()} _, err := client.SetLabels(ctx, &labelReq) if err != nil { klog.Errorf("failed to set node labels: %v", err) diff --git a/pkg/nfd-worker/nfd-worker_test.go b/pkg/nfd-client/worker/nfd-worker_test.go similarity index 78% rename from pkg/nfd-worker/nfd-worker_test.go rename to pkg/nfd-client/worker/nfd-worker_test.go index 2f7b4ea9b..41733cf4f 100644 --- a/pkg/nfd-worker/nfd-worker_test.go +++ b/pkg/nfd-client/worker/nfd-worker_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nfdworker_test +package worker_test import ( "fmt" @@ -25,8 +25,9 @@ import ( . "github.com/smartystreets/goconvey/convey" + nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client" + "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker" master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master" - worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/test/data" ) @@ -75,9 +76,9 @@ func teardownTest(ctx testContext) { func TestNewNfdWorker(t *testing.T) { Convey("When initializing new NfdWorker instance", t, func() { Convey("When one of --cert-file, --key-file or --ca-file is missing", func() { - _, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"}) - _, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"}) - _, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"}) + _, err := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}}) + _, err2 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}}) + _, err3 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}}) Convey("An error should be returned", func() { So(err, ShouldNotBeNil) So(err2, ShouldNotBeNil) @@ -93,8 +94,9 @@ func TestRun(t *testing.T) { Convey("When running nfd-worker against nfd-master", t, func() { Convey("When publishing features from fake source", func() { args := &worker.Args{ + Args: nfdclient.Args{ + Server: "localhost:8192"}, Oneshot: true, - Server: "localhost:8192", Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}}, } fooasdf, _ := worker.NewNfdWorker(args) @@ -118,13 +120,15 @@ func TestRunTls(t *testing.T) { Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() { Convey("When publishing features from fake source", func() { workerArgs := worker.Args{ - CaFile: data.FilePath("ca.crt"), - CertFile: data.FilePath("nfd-test-worker.crt"), - KeyFile: data.FilePath("nfd-test-worker.key"), - Oneshot: true, - Server: "localhost:8192", - ServerNameOverride: "nfd-test-master", - Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}}, + Args: nfdclient.Args{ + CaFile: data.FilePath("ca.crt"), + CertFile: data.FilePath("nfd-test-worker.crt"), + KeyFile: data.FilePath("nfd-test-worker.key"), + Server: "localhost:8192", + ServerNameOverride: "nfd-test-master", + }, + Oneshot: true, + Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}}, } w, _ := worker.NewNfdWorker(&workerArgs) err := w.Run()