diff --git a/cmd/nfd-worker/main.go b/cmd/nfd-worker/main.go index 91fbe78e6..4ce6131ce 100644 --- a/cmd/nfd-worker/main.go +++ b/cmd/nfd-worker/main.go @@ -32,7 +32,8 @@ import ( const ( // ProgramName is the canonical name of this program - ProgramName = "nfd-worker" + ProgramName = "nfd-worker" + GrpcHealthPort = 8082 ) func main() { @@ -81,6 +82,7 @@ func main() { utils.ConfigureGrpcKlog() // Get new NfdWorker instance + args.GrpcHealthPort = GrpcHealthPort instance, err := worker.NewNfdWorker(args) if err != nil { klog.ErrorS(err, "failed to initialize NfdWorker instance") diff --git a/deployment/base/worker-daemonset/worker-daemonset.yaml b/deployment/base/worker-daemonset/worker-daemonset.yaml index 2132498ce..c9eb432d0 100644 --- a/deployment/base/worker-daemonset/worker-daemonset.yaml +++ b/deployment/base/worker-daemonset/worker-daemonset.yaml @@ -19,6 +19,17 @@ spec: - name: nfd-worker image: gcr.io/k8s-staging-nfd/node-feature-discovery:master imagePullPolicy: Always + livenessProbe: + grpc: + port: 8082 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + grpc: + port: 8082 + initialDelaySeconds: 5 + periodSeconds: 10 + failureThreshold: 10 command: - "nfd-worker" args: diff --git a/deployment/helm/node-feature-discovery/templates/worker.yaml b/deployment/helm/node-feature-discovery/templates/worker.yaml index 45e41c19c..a8ace35e8 100644 --- a/deployment/helm/node-feature-discovery/templates/worker.yaml +++ b/deployment/helm/node-feature-discovery/templates/worker.yaml @@ -43,6 +43,17 @@ spec: {{- toYaml .Values.worker.securityContext | nindent 12 }} image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} + livenessProbe: + grpc: + port: 8082 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + grpc: + port: 8082 + initialDelaySeconds: 5 + periodSeconds: 10 + failureThreshold: 10 env: - name: NODE_NAME valueFrom: diff --git a/deployment/helm/node-feature-discovery/values.yaml b/deployment/helm/node-feature-discovery/values.yaml index dac6fcf4b..77d4f8038 100644 --- a/deployment/helm/node-feature-discovery/values.yaml +++ b/deployment/helm/node-feature-discovery/values.yaml @@ -394,6 +394,20 @@ worker: runAsNonRoot: true # runAsUser: 1000 + # livenessProbe: {} + ## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation. + # grpc: + # port: 8082 + # initialDelaySeconds: 10 + # periodSeconds: 10 + # readinessProbe: {} + ## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation. + # grpc: + # port: 8082 + # initialDelaySeconds: 5 + # periodSeconds: 10 + # failureThreshold: 10 + serviceAccount: # Specifies whether a service account should be created. # We create this by default to make it easier for downstream users to apply PodSecurityPolicies. diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go index 17a67a09b..e2348edf5 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "net" "os" "path/filepath" "regexp" @@ -33,6 +34,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" @@ -104,6 +107,7 @@ type Args struct { Server string ServerNameOverride string MetricsPort int + GrpcHealthPort int Overrides ConfigOverrideArgs } @@ -124,6 +128,7 @@ type nfdWorker struct { config *NFDConfig kubernetesNamespace string grpcClient pb.LabelerClient + healthServer *grpc.Server nfdClient *nfdclient.Clientset stop chan struct{} // channel for signaling stop featureSources []source.FeatureSource @@ -187,6 +192,29 @@ func (i *infiniteTicker) Reset(d time.Duration) { } } +func (w *nfdWorker) startGrpcHealthServer(errChan chan<- error) error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort)) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + + s := grpc.NewServer() + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) + klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort) + + go func() { + defer func() { + lis.Close() + }() + if err := s.Serve(lis); err != nil { + errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err) + } + klog.InfoS("gRPC health server stopped") + }() + w.healthServer = s + return nil +} + // Run feature discovery. func (w *nfdWorker) runFeatureDiscovery() error { discoveryStart := time.Now() @@ -262,8 +290,20 @@ func (w *nfdWorker) Run() error { return nil } + grpcErr := make(chan error, 1) + + // Start gRPC server for liveness probe (at this point we're "live") + if w.args.GrpcHealthPort != 0 { + if err := w.startGrpcHealthServer(grpcErr); err != nil { + return fmt.Errorf("failed to start gRPC health server: %w", err) + } + } + for { select { + case err := <-grpcErr: + return fmt.Errorf("error in serving gRPC: %w", err) + case <-labelTrigger.C: err = w.runFeatureDiscovery() if err != nil { @@ -294,6 +334,9 @@ func (w *nfdWorker) Run() error { case <-w.stop: klog.InfoS("shutting down nfd-worker") + if w.healthServer != nil { + w.healthServer.GracefulStop() + } configWatch.Close() w.certWatch.Close() return nil