1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

Merge pull request #1643 from ozhuraki/topology-health

nfd-topology-updater: Add liveness probe
This commit is contained in:
Kubernetes Prow Robot 2024-04-03 07:34:08 -07:00 committed by GitHub
commit fcf819ad9f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 82 additions and 0 deletions

View file

@ -36,6 +36,7 @@ const (
// ProgramName is the canonical name of this program
ProgramName = "nfd-topology-updater"
kubeletSecurePort = 10250
GrpcHealthPort = 8082
)
var DefaultKubeletStateDir = path.Join(string(hostpath.VarDir), "lib", "kubelet")
@ -54,6 +55,7 @@ func main() {
utils.ConfigureGrpcKlog()
// Get new TopologyUpdater instance
args.GrpcHealthPort = GrpcHealthPort
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs)
if err != nil {
klog.ErrorS(err, "failed to initialize topology updater instance")

View file

@ -19,6 +19,17 @@ spec:
- name: nfd-topology-updater
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
imagePullPolicy: Always
livenessProbe:
grpc:
port: 8082
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 8082
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 10
command:
- "nfd-topology-updater"
args: []

View file

@ -41,6 +41,17 @@ spec:
- name: topology-updater
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: "{{ .Values.image.pullPolicy }}"
livenessProbe:
grpc:
port: 8082
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 8082
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 10
env:
- name: NODE_NAME
valueFrom:

View file

@ -475,6 +475,20 @@ topologyUpdater:
readOnlyRootFilesystem: true
runAsUser: 0
# livenessProbe: {}
## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation.
# grpc:
# port: 8082
# initialDelaySeconds: 10
# periodSeconds: 10
# readinessProbe: {}
## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation.
# grpc:
# port: 8082
# initialDelaySeconds: 5
# periodSeconds: 10
# failureThreshold: 10
resources:
limits:
cpu: 100m

View file

@ -18,12 +18,16 @@ package nfdtopologyupdater
import (
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -58,6 +62,7 @@ type Args struct {
KubeConfigFile string
ConfigFile string
KubeletStateDir string
GrpcHealthPort int
Klog map[string]*utils.KlogFlagVal
}
@ -85,6 +90,7 @@ type nfdTopologyUpdater struct {
ownerRefs []metav1.OwnerReference
k8sClient k8sclient.Interface
kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error)
healthServer *grpc.Server
}
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
@ -128,6 +134,29 @@ func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, err
return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil
}
func (w *nfdTopologyUpdater) startGrpcHealthServer(errChan chan<- error) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
s := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort)
go func() {
defer func() {
lis.Close()
}()
if err := s.Serve(lis); err != nil {
errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err)
}
klog.InfoS("gRPC health server stopped")
}()
w.healthServer = s
return nil
}
// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error {
@ -187,8 +216,20 @@ func (w *nfdTopologyUpdater) Run() error {
return fmt.Errorf("failed to obtain node resource information: %w", err)
}
grpcErr := make(chan error, 1)
// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
if err := w.startGrpcHealthServer(grpcErr); err != nil {
return fmt.Errorf("failed to start gRPC health server: %w", err)
}
}
for {
select {
case err := <-grpcErr:
return fmt.Errorf("error in serving gRPC: %w", err)
case info := <-w.eventSource:
klog.V(4).InfoS("event received, scanning...", "event", info.Event)
scanResponse, err := resScan.Scan()
@ -217,6 +258,9 @@ func (w *nfdTopologyUpdater) Run() error {
case <-w.stop:
klog.InfoS("shutting down nfd-topology-updater")
if w.healthServer != nil {
w.healthServer.GracefulStop()
}
return nil
}
}