1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2025-03-16 21:38:23 +00:00

worker: move code

Simplify code bu dropping the unnecessary base client package.
This commit is contained in:
Markus Lehtonen 2022-12-23 10:45:07 +02:00
parent d97297ee8c
commit 1026d91d12
5 changed files with 100 additions and 179 deletions

View file

@ -24,7 +24,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker" worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
) )

View file

@ -1,136 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nfdclient
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
)
// NfdClient defines a common interface for NFD clients.
type NfdClient interface {
Run() error
Stop()
}
// NfdBaseClient is a common base type for handling connections to nfd-master.
type NfdBaseClient struct {
args Args
clientConn *grpc.ClientConn
}
// Args holds the common command line arguments for all nfd clients.
type Args struct {
CaFile string
CertFile string
KeyFile string
Kubeconfig string
Server string
ServerNameOverride string
Klog map[string]*utils.KlogFlagVal
}
// NewNfdBaseClient creates a new NfdBaseClient instance.
func NewNfdBaseClient(args *Args) (NfdBaseClient, error) {
nfd := NfdBaseClient{args: *args}
// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
}
}
return nfd, nil
}
// ClientConn returns the grpc ClientConn object.
func (w *NfdBaseClient) ClientConn() *grpc.ClientConn { return w.clientConn }
// Connect creates a gRPC client connection to nfd-master.
func (w *NfdBaseClient) Connect() error {
// Check that if a connection already exists
if w.clientConn != nil {
return fmt.Errorf("client connection already exists")
}
// Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := os.ReadFile(w.args.CaFile)
if err != nil {
return fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
MinVersion: tls.VersionTLS13,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return err
}
w.clientConn = conn
return nil
}
// Disconnect closes the connection to NFD master
func (w *NfdBaseClient) Disconnect() {
if w.clientConn != nil {
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package worker package nfdworker
import ( import (
"os" "os"

View file

@ -14,9 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package worker package nfdworker
import ( import (
"crypto/tls"
"crypto/x509"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@ -27,6 +29,9 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -38,7 +43,6 @@ import (
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1" nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned" nfdclient "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned"
pb "sigs.k8s.io/node-feature-discovery/pkg/labeler" pb "sigs.k8s.io/node-feature-discovery/pkg/labeler"
clientcommon "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/node-feature-discovery/source" "sigs.k8s.io/node-feature-discovery/source"
@ -57,6 +61,12 @@ import (
_ "sigs.k8s.io/node-feature-discovery/source/usb" _ "sigs.k8s.io/node-feature-discovery/source/usb"
) )
// NfdWorker is the interface for nfd-worker daemon
type NfdWorker interface {
Run() error
Stop()
}
// NFDConfig contains the configuration settings of NfdWorker. // NFDConfig contains the configuration settings of NfdWorker.
type NFDConfig struct { type NFDConfig struct {
Core coreConfig Core coreConfig
@ -80,14 +90,18 @@ type Labels map[string]string
// Args are the command line arguments of NfdWorker. // Args are the command line arguments of NfdWorker.
type Args struct { type Args struct {
clientcommon.Args CaFile string
CertFile string
ConfigFile string ConfigFile string
EnableNodeFeatureApi bool EnableNodeFeatureApi bool
KeyFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool Oneshot bool
Options string Options string
Server string
ServerNameOverride string
Klog map[string]*utils.KlogFlagVal
Overrides ConfigOverrideArgs Overrides ConfigOverrideArgs
} }
@ -100,10 +114,9 @@ type ConfigOverrideArgs struct {
} }
type nfdWorker struct { type nfdWorker struct {
clientcommon.NfdBaseClient
args Args args Args
certWatch *utils.FsWatcher certWatch *utils.FsWatcher
clientConn *grpc.ClientConn
configFilePath string configFilePath string
config *NFDConfig config *NFDConfig
kubernetesNamespace string kubernetesNamespace string
@ -119,21 +132,27 @@ type duration struct {
} }
// NewNfdWorker creates new NfdWorker instance. // NewNfdWorker creates new NfdWorker instance.
func NewNfdWorker(args *Args) (clientcommon.NfdClient, error) { func NewNfdWorker(args *Args) (NfdWorker, error) {
base, err := clientcommon.NewNfdBaseClient(&args.Args)
if err != nil {
return nil, err
}
nfd := &nfdWorker{ nfd := &nfdWorker{
NfdBaseClient: base,
args: *args, args: *args,
config: &NFDConfig{}, config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(), kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}, 1), stop: make(chan struct{}, 1),
} }
// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
}
}
if args.ConfigFile != "" { if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile) nfd.configFilePath = filepath.Clean(args.ConfigFile)
} }
@ -175,7 +194,7 @@ func (w *nfdWorker) Run() error {
return err return err
} }
defer w.GrpcDisconnect() defer w.grpcDisconnect()
labelTrigger := time.After(0) labelTrigger := time.After(0)
for { for {
@ -214,7 +233,7 @@ func (w *nfdWorker) Run() error {
} }
// Manage connection to master // Manage connection to master
if w.config.Core.NoPublish || !w.args.EnableNodeFeatureApi { if w.config.Core.NoPublish || !w.args.EnableNodeFeatureApi {
w.GrpcDisconnect() w.grpcDisconnect()
} }
// Always re-label after a re-config event. This way the new config // Always re-label after a re-config event. This way the new config
@ -223,7 +242,7 @@ func (w *nfdWorker) Run() error {
case <-w.certWatch.Events: case <-w.certWatch.Events:
klog.Infof("TLS certificate update, renewing connection to nfd-master") klog.Infof("TLS certificate update, renewing connection to nfd-master")
w.GrpcDisconnect() w.grpcDisconnect()
case <-w.stop: case <-w.stop:
klog.Infof("shutting down nfd-worker") klog.Infof("shutting down nfd-worker")
@ -249,18 +268,60 @@ func (w *nfdWorker) getGrpcClient() (pb.LabelerClient, error) {
return w.grpcClient, nil return w.grpcClient, nil
} }
if err := w.NfdBaseClient.Connect(); err != nil { // Check that if a connection already exists
return nil, err if w.clientConn != nil {
return nil, fmt.Errorf("client connection already exists")
} }
w.grpcClient = pb.NewLabelerClient(w.ClientConn()) // Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := os.ReadFile(w.args.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
MinVersion: tls.VersionTLS13,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return nil, err
}
w.clientConn = conn
w.grpcClient = pb.NewLabelerClient(w.clientConn)
return w.grpcClient, nil return w.grpcClient, nil
} }
// GrpcDisconnect closes the gRPC connection to NFD master // grpcDisconnect closes the gRPC connection to NFD master
func (w *nfdWorker) GrpcDisconnect() { func (w *nfdWorker) grpcDisconnect() {
w.NfdBaseClient.Disconnect() if w.clientConn != nil {
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
w.grpcClient = nil w.grpcClient = nil
} }
func (c *coreConfig) sanitize() { func (c *coreConfig) sanitize() {

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package worker_test package nfdworker_test
import ( import (
"fmt" "fmt"
@ -25,9 +25,8 @@ import (
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker"
master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master" master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master"
worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/test/data" "sigs.k8s.io/node-feature-discovery/test/data"
) )
@ -76,9 +75,9 @@ func teardownTest(ctx testContext) {
func TestNewNfdWorker(t *testing.T) { func TestNewNfdWorker(t *testing.T) {
Convey("When initializing new NfdWorker instance", t, func() { Convey("When initializing new NfdWorker instance", t, func() {
Convey("When one of -cert-file, -key-file or -ca-file is missing", func() { Convey("When one of -cert-file, -key-file or -ca-file is missing", func() {
_, err := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}}) _, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"})
_, err2 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}}) _, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"})
_, err3 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}}) _, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"})
Convey("An error should be returned", func() { Convey("An error should be returned", func() {
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(err2, ShouldNotBeNil) So(err2, ShouldNotBeNil)
@ -94,8 +93,7 @@ func TestRun(t *testing.T) {
Convey("When running nfd-worker against nfd-master", t, func() { Convey("When running nfd-worker against nfd-master", t, func() {
Convey("When publishing features from fake source", func() { Convey("When publishing features from fake source", func() {
args := &worker.Args{ args := &worker.Args{
Args: nfdclient.Args{ Server: "localhost:8192",
Server: "localhost:8192"},
Oneshot: true, Oneshot: true,
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
} }
@ -120,13 +118,11 @@ func TestRunTls(t *testing.T) {
Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() { Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() {
Convey("When publishing features from fake source", func() { Convey("When publishing features from fake source", func() {
workerArgs := worker.Args{ workerArgs := worker.Args{
Args: nfdclient.Args{
CaFile: data.FilePath("ca.crt"), CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-worker.crt"), CertFile: data.FilePath("nfd-test-worker.crt"),
KeyFile: data.FilePath("nfd-test-worker.key"), KeyFile: data.FilePath("nfd-test-worker.key"),
Server: "localhost:8192", Server: "localhost:8192",
ServerNameOverride: "nfd-test-master", ServerNameOverride: "nfd-test-master",
},
Oneshot: true, Oneshot: true,
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
} }