1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

Merge pull request #1020 from marquiz/devel/worker-refactor

worker: move code
This commit is contained in:
Kubernetes Prow Robot 2022-12-27 00:45:34 -08:00 committed by GitHub
commit 8eb6640754
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 179 deletions

View file

@ -24,7 +24,7 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker"
worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)

View file

@ -1,136 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nfdclient
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
)
// NfdClient defines a common interface for NFD clients.
type NfdClient interface {
Run() error
Stop()
}
// NfdBaseClient is a common base type for handling connections to nfd-master.
type NfdBaseClient struct {
args Args
clientConn *grpc.ClientConn
}
// Args holds the common command line arguments for all nfd clients.
type Args struct {
CaFile string
CertFile string
KeyFile string
Kubeconfig string
Server string
ServerNameOverride string
Klog map[string]*utils.KlogFlagVal
}
// NewNfdBaseClient creates a new NfdBaseClient instance.
func NewNfdBaseClient(args *Args) (NfdBaseClient, error) {
nfd := NfdBaseClient{args: *args}
// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
}
}
return nfd, nil
}
// ClientConn returns the grpc ClientConn object.
func (w *NfdBaseClient) ClientConn() *grpc.ClientConn { return w.clientConn }
// Connect creates a gRPC client connection to nfd-master.
func (w *NfdBaseClient) Connect() error {
// Check that if a connection already exists
if w.clientConn != nil {
return fmt.Errorf("client connection already exists")
}
// Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := os.ReadFile(w.args.CaFile)
if err != nil {
return fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
MinVersion: tls.VersionTLS13,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return err
}
w.clientConn = conn
return nil
}
// Disconnect closes the connection to NFD master
func (w *NfdBaseClient) Disconnect() {
if w.clientConn != nil {
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package worker
package nfdworker
import (
"os"

View file

@ -14,9 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package worker
package nfdworker
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"os"
@ -27,6 +29,9 @@ import (
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/klog/v2"
@ -38,7 +43,6 @@ import (
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned"
pb "sigs.k8s.io/node-feature-discovery/pkg/labeler"
clientcommon "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/node-feature-discovery/source"
@ -57,6 +61,12 @@ import (
_ "sigs.k8s.io/node-feature-discovery/source/usb"
)
// NfdWorker is the interface for nfd-worker daemon
type NfdWorker interface {
Run() error
Stop()
}
// NFDConfig contains the configuration settings of NfdWorker.
type NFDConfig struct {
Core coreConfig
@ -80,14 +90,18 @@ type Labels map[string]string
// Args are the command line arguments of NfdWorker.
type Args struct {
clientcommon.Args
CaFile string
CertFile string
ConfigFile string
EnableNodeFeatureApi bool
KeyFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool
Options string
Server string
ServerNameOverride string
Klog map[string]*utils.KlogFlagVal
Overrides ConfigOverrideArgs
}
@ -100,10 +114,9 @@ type ConfigOverrideArgs struct {
}
type nfdWorker struct {
clientcommon.NfdBaseClient
args Args
certWatch *utils.FsWatcher
clientConn *grpc.ClientConn
configFilePath string
config *NFDConfig
kubernetesNamespace string
@ -119,21 +132,27 @@ type duration struct {
}
// NewNfdWorker creates new NfdWorker instance.
func NewNfdWorker(args *Args) (clientcommon.NfdClient, error) {
base, err := clientcommon.NewNfdBaseClient(&args.Args)
if err != nil {
return nil, err
}
func NewNfdWorker(args *Args) (NfdWorker, error) {
nfd := &nfdWorker{
NfdBaseClient: base,
args: *args,
config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}, 1),
}
// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
}
}
if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
}
@ -175,7 +194,7 @@ func (w *nfdWorker) Run() error {
return err
}
defer w.GrpcDisconnect()
defer w.grpcDisconnect()
labelTrigger := time.After(0)
for {
@ -214,7 +233,7 @@ func (w *nfdWorker) Run() error {
}
// Manage connection to master
if w.config.Core.NoPublish || !w.args.EnableNodeFeatureApi {
w.GrpcDisconnect()
w.grpcDisconnect()
}
// Always re-label after a re-config event. This way the new config
@ -223,7 +242,7 @@ func (w *nfdWorker) Run() error {
case <-w.certWatch.Events:
klog.Infof("TLS certificate update, renewing connection to nfd-master")
w.GrpcDisconnect()
w.grpcDisconnect()
case <-w.stop:
klog.Infof("shutting down nfd-worker")
@ -249,18 +268,60 @@ func (w *nfdWorker) getGrpcClient() (pb.LabelerClient, error) {
return w.grpcClient, nil
}
if err := w.NfdBaseClient.Connect(); err != nil {
return nil, err
// Check that if a connection already exists
if w.clientConn != nil {
return nil, fmt.Errorf("client connection already exists")
}
w.grpcClient = pb.NewLabelerClient(w.ClientConn())
// Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := os.ReadFile(w.args.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
MinVersion: tls.VersionTLS13,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return nil, err
}
w.clientConn = conn
w.grpcClient = pb.NewLabelerClient(w.clientConn)
return w.grpcClient, nil
}
// GrpcDisconnect closes the gRPC connection to NFD master
func (w *nfdWorker) GrpcDisconnect() {
w.NfdBaseClient.Disconnect()
// grpcDisconnect closes the gRPC connection to NFD master
func (w *nfdWorker) grpcDisconnect() {
if w.clientConn != nil {
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
w.grpcClient = nil
}
func (c *coreConfig) sanitize() {

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package worker_test
package nfdworker_test
import (
"fmt"
@ -25,9 +25,8 @@ import (
. "github.com/smartystreets/goconvey/convey"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker"
master "sigs.k8s.io/node-feature-discovery/pkg/nfd-master"
worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/test/data"
)
@ -76,9 +75,9 @@ func teardownTest(ctx testContext) {
func TestNewNfdWorker(t *testing.T) {
Convey("When initializing new NfdWorker instance", t, func() {
Convey("When one of -cert-file, -key-file or -ca-file is missing", func() {
_, err := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", KeyFile: "key"}})
_, err2 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{KeyFile: "key", CaFile: "ca"}})
_, err3 := worker.NewNfdWorker(&worker.Args{Args: nfdclient.Args{CertFile: "crt", CaFile: "ca"}})
_, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"})
_, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"})
_, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"})
Convey("An error should be returned", func() {
So(err, ShouldNotBeNil)
So(err2, ShouldNotBeNil)
@ -94,8 +93,7 @@ func TestRun(t *testing.T) {
Convey("When running nfd-worker against nfd-master", t, func() {
Convey("When publishing features from fake source", func() {
args := &worker.Args{
Args: nfdclient.Args{
Server: "localhost:8192"},
Server: "localhost:8192",
Oneshot: true,
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
}
@ -120,15 +118,13 @@ func TestRunTls(t *testing.T) {
Convey("When running nfd-worker against nfd-master with mutual TLS auth enabled", t, func() {
Convey("When publishing features from fake source", func() {
workerArgs := worker.Args{
Args: nfdclient.Args{
CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-worker.crt"),
KeyFile: data.FilePath("nfd-test-worker.key"),
Server: "localhost:8192",
ServerNameOverride: "nfd-test-master",
},
Oneshot: true,
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
CaFile: data.FilePath("ca.crt"),
CertFile: data.FilePath("nfd-test-worker.crt"),
KeyFile: data.FilePath("nfd-test-worker.key"),
Server: "localhost:8192",
ServerNameOverride: "nfd-test-master",
Oneshot: true,
Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}},
}
w, _ := worker.NewNfdWorker(&workerArgs)
err := w.Run()