mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2024-12-14 11:57:51 +00:00
Merge pull request #1609 from ozhuraki/worker-health
nfd-worker: Add liveness probe
This commit is contained in:
commit
0ad5e50f24
5 changed files with 82 additions and 1 deletions
|
@ -32,7 +32,8 @@ import (
|
|||
|
||||
const (
|
||||
// ProgramName is the canonical name of this program
|
||||
ProgramName = "nfd-worker"
|
||||
ProgramName = "nfd-worker"
|
||||
GrpcHealthPort = 8082
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -79,6 +80,7 @@ func main() {
|
|||
utils.ConfigureGrpcKlog()
|
||||
|
||||
// Get new NfdWorker instance
|
||||
args.GrpcHealthPort = GrpcHealthPort
|
||||
instance, err := worker.NewNfdWorker(args)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to initialize NfdWorker instance")
|
||||
|
|
|
@ -19,6 +19,17 @@ spec:
|
|||
- name: nfd-worker
|
||||
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
|
||||
imagePullPolicy: Always
|
||||
livenessProbe:
|
||||
grpc:
|
||||
port: 8082
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 10
|
||||
readinessProbe:
|
||||
grpc:
|
||||
port: 8082
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 10
|
||||
failureThreshold: 10
|
||||
command:
|
||||
- "nfd-worker"
|
||||
args:
|
||||
|
|
|
@ -43,6 +43,17 @@ spec:
|
|||
{{- toYaml .Values.worker.securityContext | nindent 12 }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
livenessProbe:
|
||||
grpc:
|
||||
port: 8082
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 10
|
||||
readinessProbe:
|
||||
grpc:
|
||||
port: 8082
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 10
|
||||
failureThreshold: 10
|
||||
env:
|
||||
- name: NODE_NAME
|
||||
valueFrom:
|
||||
|
|
|
@ -394,6 +394,20 @@ worker:
|
|||
runAsNonRoot: true
|
||||
# runAsUser: 1000
|
||||
|
||||
# livenessProbe: {}
|
||||
## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation.
|
||||
# grpc:
|
||||
# port: 8082
|
||||
# initialDelaySeconds: 10
|
||||
# periodSeconds: 10
|
||||
# readinessProbe: {}
|
||||
## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation.
|
||||
# grpc:
|
||||
# port: 8082
|
||||
# initialDelaySeconds: 5
|
||||
# periodSeconds: 10
|
||||
# failureThreshold: 10
|
||||
|
||||
serviceAccount:
|
||||
# Specifies whether a service account should be created.
|
||||
# We create this by default to make it easier for downstream users to apply PodSecurityPolicies.
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
|
@ -33,6 +34,8 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/health"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
|
@ -104,6 +107,7 @@ type Args struct {
|
|||
Server string
|
||||
ServerNameOverride string
|
||||
MetricsPort int
|
||||
GrpcHealthPort int
|
||||
|
||||
Overrides ConfigOverrideArgs
|
||||
}
|
||||
|
@ -124,6 +128,7 @@ type nfdWorker struct {
|
|||
config *NFDConfig
|
||||
kubernetesNamespace string
|
||||
grpcClient pb.LabelerClient
|
||||
healthServer *grpc.Server
|
||||
nfdClient *nfdclient.Clientset
|
||||
stop chan struct{} // channel for signaling stop
|
||||
featureSources []source.FeatureSource
|
||||
|
@ -187,6 +192,29 @@ func (i *infiniteTicker) Reset(d time.Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *nfdWorker) startGrpcHealthServer(errChan chan<- error) error {
|
||||
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to listen: %w", err)
|
||||
}
|
||||
|
||||
s := grpc.NewServer()
|
||||
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
|
||||
klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
lis.Close()
|
||||
}()
|
||||
if err := s.Serve(lis); err != nil {
|
||||
errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err)
|
||||
}
|
||||
klog.InfoS("gRPC health server stopped")
|
||||
}()
|
||||
w.healthServer = s
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run feature discovery.
|
||||
func (w *nfdWorker) runFeatureDiscovery() error {
|
||||
discoveryStart := time.Now()
|
||||
|
@ -262,8 +290,20 @@ func (w *nfdWorker) Run() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
grpcErr := make(chan error, 1)
|
||||
|
||||
// Start gRPC server for liveness probe (at this point we're "live")
|
||||
if w.args.GrpcHealthPort != 0 {
|
||||
if err := w.startGrpcHealthServer(grpcErr); err != nil {
|
||||
return fmt.Errorf("failed to start gRPC health server: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <-grpcErr:
|
||||
return fmt.Errorf("error in serving gRPC: %w", err)
|
||||
|
||||
case <-labelTrigger.C:
|
||||
err = w.runFeatureDiscovery()
|
||||
if err != nil {
|
||||
|
@ -294,6 +334,9 @@ func (w *nfdWorker) Run() error {
|
|||
|
||||
case <-w.stop:
|
||||
klog.InfoS("shutting down nfd-worker")
|
||||
if w.healthServer != nil {
|
||||
w.healthServer.GracefulStop()
|
||||
}
|
||||
configWatch.Close()
|
||||
w.certWatch.Close()
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue