1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-16 21:38:23 +00:00

nfd-worker: split out gRPC connection handling

Refactor the worker code and split out gRPC client connection handling
into a separate base type. The intent is to promote re-usability of code
for other NFD clients, too.
This commit is contained in:
Markus Lehtonen 2021-07-08 11:02:39 +03:00
parent 150b8271fa
commit 112744bc50
5 changed files with 192 additions and 110 deletions

View file

@ -24,7 +24,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker" "sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
) )

139
pkg/nfd-client/base.go Normal file
View file

@ -0,0 +1,139 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nfdclient
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
)
// NfdClient defines a common interface for NFD clients.
type NfdClient interface {
Run() error
Stop()
}
// NfdBaseClient is a common base type for handling connections to nfd-master.
type NfdBaseClient struct {
args Args
clientConn *grpc.ClientConn
}
// Args holds the common command line arguments for all nfd clients.
type Args struct {
CaFile string
CertFile string
KeyFile string
Server string
ServerNameOverride string
Klog map[string]*utils.KlogFlagVal
}
var nodeName = os.Getenv("NODE_NAME")
// NodeName returns the name of the k8s node we're running on.
func NodeName() string { return nodeName }
// Create new NfdWorker instance.
func NewNfdBaseClient(args *Args) (NfdBaseClient, error) {
nfd := NfdBaseClient{args: *args}
// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("--cert-file needs to be specified alongside --key-file and --ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("--key-file needs to be specified alongside --cert-file and --ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("--ca-file needs to be specified alongside --cert-file and --key-file")
}
}
return nfd, nil
}
// ClientConn returns the grpc ClientConn object.
func (w *NfdBaseClient) ClientConn() *grpc.ClientConn { return w.clientConn }
// Connect creates a gRPC client connection to nfd-master.
func (w *NfdBaseClient) Connect() error {
// Check that if a connection already exists
if w.clientConn != nil {
return fmt.Errorf("client connection already exists")
}
// Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := ioutil.ReadFile(w.args.CaFile)
if err != nil {
return fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return err
}
w.clientConn = conn
return nil
}
// disconnect closes the connection to NFD master
func (w *NfdBaseClient) Disconnect() {
if w.clientConn != nil {
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package nfdworker package worker
import ( import (
"io/ioutil" "io/ioutil"

View file

@ -14,11 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package nfdworker package worker
import ( import (
"crypto/tls"
"crypto/x509"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -29,13 +27,12 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
pb "sigs.k8s.io/node-feature-discovery/pkg/labeler" pb "sigs.k8s.io/node-feature-discovery/pkg/labeler"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/node-feature-discovery/source" "sigs.k8s.io/node-feature-discovery/source"
@ -53,10 +50,6 @@ import (
"sigs.k8s.io/node-feature-discovery/source/usb" "sigs.k8s.io/node-feature-discovery/source/usb"
) )
var (
nodeName = os.Getenv("NODE_NAME")
)
// Global config // Global config
type NFDConfig struct { type NFDConfig struct {
Core coreConfig Core coreConfig
@ -78,14 +71,11 @@ type Labels map[string]string
// Command line arguments // Command line arguments
type Args struct { type Args struct {
CaFile string nfdclient.Args
CertFile string
KeyFile string ConfigFile string
ConfigFile string Oneshot bool
Options string Options string
Oneshot bool
Server string
ServerNameOverride string
Klog map[string]*utils.KlogFlagVal Klog map[string]*utils.KlogFlagVal
Overrides ConfigOverrideArgs Overrides ConfigOverrideArgs
@ -101,15 +91,11 @@ type ConfigOverrideArgs struct {
Sources *utils.StringSliceVal Sources *utils.StringSliceVal
} }
type NfdWorker interface {
Run() error
Stop()
}
type nfdWorker struct { type nfdWorker struct {
nfdclient.NfdBaseClient
args Args args Args
certWatch *utils.FsWatcher certWatch *utils.FsWatcher
clientConn *grpc.ClientConn
client pb.LabelerClient client pb.LabelerClient
configFilePath string configFilePath string
config *NFDConfig config *NFDConfig
@ -124,8 +110,15 @@ type duration struct {
} }
// Create new NfdWorker instance. // Create new NfdWorker instance.
func NewNfdWorker(args *Args) (NfdWorker, error) { func NewNfdWorker(args *Args) (nfdclient.NfdClient, error) {
base, err := nfdclient.NewNfdBaseClient(&args.Args)
if err != nil {
return nil, err
}
nfd := &nfdWorker{ nfd := &nfdWorker{
NfdBaseClient: base,
args: *args, args: *args,
config: &NFDConfig{}, config: &NFDConfig{},
realSources: []source.FeatureSource{ realSources: []source.FeatureSource{
@ -153,19 +146,6 @@ func NewNfdWorker(args *Args) (NfdWorker, error) {
nfd.configFilePath = filepath.Clean(args.ConfigFile) nfd.configFilePath = filepath.Clean(args.ConfigFile)
} }
// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("--cert-file needs to be specified alongside --key-file and --ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("--key-file needs to be specified alongside --cert-file and --ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("--ca-file needs to be specified alongside --cert-file and --key-file")
}
}
return nfd, nil return nfd, nil
} }
@ -184,7 +164,7 @@ func newDefaultConfig() *NFDConfig {
// one request if OneShot is set to 'true' in the worker args. // one request if OneShot is set to 'true' in the worker args.
func (w *nfdWorker) Run() error { func (w *nfdWorker) Run() error {
klog.Infof("Node Feature Discovery Worker %s", version.Get()) klog.Infof("Node Feature Discovery Worker %s", version.Get())
klog.Infof("NodeName: '%s'", nodeName) klog.Infof("NodeName: '%s'", nfdclient.NodeName())
// Create watcher for config file and read initial configuration // Create watcher for config file and read initial configuration
configWatch, err := utils.CreateFsWatcher(time.Second, w.configFilePath) configWatch, err := utils.CreateFsWatcher(time.Second, w.configFilePath)
@ -202,11 +182,11 @@ func (w *nfdWorker) Run() error {
} }
// Connect to NFD master // Connect to NFD master
err = w.connect() err = w.Connect()
if err != nil { if err != nil {
return fmt.Errorf("failed to connect: %v", err) return fmt.Errorf("failed to connect: %v", err)
} }
defer w.disconnect() defer w.Disconnect()
labelTrigger := time.After(0) labelTrigger := time.After(0)
for { for {
@ -238,9 +218,9 @@ func (w *nfdWorker) Run() error {
} }
// Manage connection to master // Manage connection to master
if w.config.Core.NoPublish { if w.config.Core.NoPublish {
w.disconnect() w.Disconnect()
} else if w.clientConn == nil { } else if w.ClientConn() == nil {
if err := w.connect(); err != nil { if err := w.Connect(); err != nil {
return err return err
} }
} }
@ -250,8 +230,8 @@ func (w *nfdWorker) Run() error {
case <-w.certWatch.Events: case <-w.certWatch.Events:
klog.Infof("TLS certificate update, renewing connection to nfd-master") klog.Infof("TLS certificate update, renewing connection to nfd-master")
w.disconnect() w.Disconnect()
if err := w.connect(); err != nil { if err := w.Connect(); err != nil {
return err return err
} }
@ -272,68 +252,27 @@ func (w *nfdWorker) Stop() {
} }
} }
// connect creates a client connection to the NFD master // Connect creates a client connection to the NFD master
func (w *nfdWorker) connect() error { func (w *nfdWorker) Connect() error {
// Return a dummy connection in case of dry-run // Return a dummy connection in case of dry-run
if w.config.Core.NoPublish { if w.config.Core.NoPublish {
return nil return nil
} }
// Check that if a connection already exists if err := w.NfdBaseClient.Connect(); err != nil {
if w.clientConn != nil {
return fmt.Errorf("client connection already exists")
}
// Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := ioutil.ReadFile(w.args.CaFile)
if err != nil {
return fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return err return err
} }
w.clientConn = conn
w.client = pb.NewLabelerClient(conn) w.client = pb.NewLabelerClient(w.ClientConn())
return nil return nil
} }
// disconnect closes the connection to NFD master // Disconnect closes the connection to NFD master
func (w *nfdWorker) disconnect() { func (w *nfdWorker) Disconnect() {
if w.clientConn != nil { w.NfdBaseClient.Disconnect()
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
w.client = nil w.client = nil
} }
func (c *coreConfig) sanitize() { func (c *coreConfig) sanitize() {
if c.SleepInterval.Duration > 0 && c.SleepInterval.Duration < time.Second { if c.SleepInterval.Duration > 0 && c.SleepInterval.Duration < time.Second {
klog.Warningf("too short sleep-intervall specified (%s), forcing to 1s", klog.Warningf("too short sleep-intervall specified (%s), forcing to 1s",
@ -551,7 +490,7 @@ func advertiseFeatureLabels(client pb.LabelerClient, labels Labels) error {
labelReq := pb.SetLabelsRequest{Labels: labels, labelReq := pb.SetLabelsRequest{Labels: labels,
NfdVersion: version.Get(), NfdVersion: version.Get(),
NodeName: nodeName} NodeName: nfdclient.NodeName()}
_, err := client.SetLabels(ctx, &labelReq) _, err := client.SetLabels(ctx, &labelReq)
if err != nil { if err != nil {
klog.Errorf("failed to set node labels: %v", err) klog.Errorf("failed to set node labels: %v", err)

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package nfdworker_test package worker_test
import ( import (
"fmt" "fmt"
@ -25,8 +25,9 @@ import (
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker"
master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master" master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master"
worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/test/data" "sigs.k8s.io/node-feature-discovery/test/data"
) )
@ -75,9 +76,9 @@ func teardownTest(ctx testContext) {
func TestNewNfdWorker(t *testing.T) { func TestNewNfdWorker(t *testing.T) {
Convey("When initializing new NfdWorker instance", t, func() { Convey("When initializing new NfdWorker instance", t, func() {
Convey("When one of --cert-file, --key-file or --ca-file is missing", func() { Convey("When one of --cert-file, --key-file or --ca-file is missing", func() {
_, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"}) _, err := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}})
_, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"}) _, err2 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}})
_, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"}) _, err3 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}})
Convey("An error should be returned", func() { Convey("An error should be returned", func() {
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(err2, ShouldNotBeNil) So(err2, ShouldNotBeNil)
@ -93,8 +94,9 @@ func TestRun(t *testing.T) {
Convey("When running nfd-worker against nfd-master", t, func() { Convey("When running nfd-worker against nfd-master", t, func() {
Convey("When publishing features from fake source", func() { Convey("When publishing features from fake source", func() {
args := &worker.Args{ args := &worker.Args{
Args: nfdclient.Args{
Server: "localhost:8192"},
Oneshot: true, Oneshot: true,
Server: "localhost:8192",
Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}}, Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}},
} }
fooasdf, _ := worker.NewNfdWorker(args) fooasdf, _ := worker.NewNfdWorker(args)
@ -118,13 +120,15 @@ func TestRunTls(t *testing.T) {
Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() { Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() {
Convey("When publishing features from fake source", func() { Convey("When publishing features from fake source", func() {
workerArgs := worker.Args{ workerArgs := worker.Args{
CaFile: data.FilePath("ca.crt"), Args: nfdclient.Args{
CertFile: data.FilePath("nfd-test-worker.crt"), CaFile: data.FilePath("ca.crt"),
KeyFile: data.FilePath("nfd-test-worker.key"), CertFile: data.FilePath("nfd-test-worker.crt"),
Oneshot: true, KeyFile: data.FilePath("nfd-test-worker.key"),
Server: "localhost:8192", Server: "localhost:8192",
ServerNameOverride: "nfd-test-master", ServerNameOverride: "nfd-test-master",
Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}}, },
Oneshot: true,
Overrides: worker.ConfigOverrideArgs{Sources: &utils.StringSliceVal{"fake"}},
} }
w, _ := worker.NewNfdWorker(&workerArgs) w, _ := worker.NewNfdWorker(&workerArgs)
err := w.Run() err := w.Run()