1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

nfd-worker: Add liveness probe

Signed-off-by: Oleg Zhurakivskyy <oleg.zhurakivskyy@intel.com>
This commit is contained in:
Oleg Zhurakivskyy 2024-03-11 17:40:44 +02:00
parent 35cc81969f
commit 8b63d17af7
5 changed files with 82 additions and 1 deletions

View file

@ -33,6 +33,7 @@ import (
const (
// ProgramName is the canonical name of this program
ProgramName = "nfd-worker"
GrpcHealthPort = 8082
)
func main() {
@ -81,6 +82,7 @@ func main() {
utils.ConfigureGrpcKlog()
// Get new NfdWorker instance
args.GrpcHealthPort = GrpcHealthPort
instance, err := worker.NewNfdWorker(args)
if err != nil {
klog.ErrorS(err, "failed to initialize NfdWorker instance")

View file

@ -19,6 +19,17 @@ spec:
- name: nfd-worker
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
imagePullPolicy: Always
livenessProbe:
grpc:
port: 8082
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 8082
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 10
command:
- "nfd-worker"
args:

View file

@ -43,6 +43,17 @@ spec:
{{- toYaml .Values.worker.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
livenessProbe:
grpc:
port: 8082
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 8082
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 10
env:
- name: NODE_NAME
valueFrom:

View file

@ -394,6 +394,20 @@ worker:
runAsNonRoot: true
# runAsUser: 1000
# livenessProbe: {}
## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation.
# grpc:
# port: 8082
# initialDelaySeconds: 10
# periodSeconds: 10
# readinessProbe: {}
## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation.
# grpc:
# port: 8082
# initialDelaySeconds: 5
# periodSeconds: 10
# failureThreshold: 10
serviceAccount:
# Specifies whether a service account should be created.
# We create this by default to make it easier for downstream users to apply PodSecurityPolicies.

View file

@ -21,6 +21,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"regexp"
@ -33,6 +34,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
@ -104,6 +107,7 @@ type Args struct {
Server string
ServerNameOverride string
MetricsPort int
GrpcHealthPort int
Overrides ConfigOverrideArgs
}
@ -124,6 +128,7 @@ type nfdWorker struct {
config *NFDConfig
kubernetesNamespace string
grpcClient pb.LabelerClient
healthServer *grpc.Server
nfdClient *nfdclient.Clientset
stop chan struct{} // channel for signaling stop
featureSources []source.FeatureSource
@ -187,6 +192,29 @@ func (i *infiniteTicker) Reset(d time.Duration) {
}
}
func (w *nfdWorker) startGrpcHealthServer(errChan chan<- error) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
s := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort)
go func() {
defer func() {
lis.Close()
}()
if err := s.Serve(lis); err != nil {
errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err)
}
klog.InfoS("gRPC health server stopped")
}()
w.healthServer = s
return nil
}
// Run feature discovery.
func (w *nfdWorker) runFeatureDiscovery() error {
discoveryStart := time.Now()
@ -262,8 +290,20 @@ func (w *nfdWorker) Run() error {
return nil
}
grpcErr := make(chan error, 1)
// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
if err := w.startGrpcHealthServer(grpcErr); err != nil {
return fmt.Errorf("failed to start gRPC health server: %w", err)
}
}
for {
select {
case err := <-grpcErr:
return fmt.Errorf("error in serving gRPC: %w", err)
case <-labelTrigger.C:
err = w.runFeatureDiscovery()
if err != nil {
@ -294,6 +334,9 @@ func (w *nfdWorker) Run() error {
case <-w.stop:
klog.InfoS("shutting down nfd-worker")
if w.healthServer != nil {
w.healthServer.GracefulStop()
}
configWatch.Close()
w.certWatch.Close()
return nil