mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2025-03-05 08:17:04 +00:00
pkg/utils: generalize file watcher
Add the capability to watch multiple files. Move it to a separate package in order to make it reusable.
This commit is contained in:
parent
4c5285d9ed
commit
dfc2596a22
2 changed files with 162 additions and 63 deletions
|
@ -28,7 +28,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
@ -169,40 +168,6 @@ func NewNfdWorker(args *Args) (NfdWorker, error) {
|
|||
return nfd, nil
|
||||
}
|
||||
|
||||
func addConfigWatch(path string) (*fsnotify.Watcher, map[string]struct{}, error) {
|
||||
paths := make(map[string]struct{})
|
||||
|
||||
// Create watcher
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return w, paths, fmt.Errorf("failed to create fsnotify watcher: %v", err)
|
||||
}
|
||||
|
||||
// Add watches for all directory components so that we catch e.g. renames
|
||||
// upper in the tree
|
||||
added := false
|
||||
for p := path; ; p = filepath.Dir(p) {
|
||||
|
||||
if err := w.Add(p); err != nil {
|
||||
klog.V(1).Infof("failed to add fsnotify watch for %q: %v", p, err)
|
||||
} else {
|
||||
klog.V(1).Infof("added fsnotify watch %q", p)
|
||||
added = true
|
||||
}
|
||||
|
||||
paths[p] = struct{}{}
|
||||
if filepath.Dir(p) == p {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !added {
|
||||
// Want to be sure that we watch something
|
||||
return w, paths, fmt.Errorf("failed to add any watch")
|
||||
}
|
||||
return w, paths, nil
|
||||
}
|
||||
|
||||
func newDefaultConfig() *NFDConfig {
|
||||
return &NFDConfig{
|
||||
Core: coreConfig{
|
||||
|
@ -221,7 +186,7 @@ func (w *nfdWorker) Run() error {
|
|||
klog.Infof("NodeName: '%s'", nodeName)
|
||||
|
||||
// Create watcher for config file and read initial configuration
|
||||
configWatch, paths, err := addConfigWatch(w.configFilePath)
|
||||
configWatch, err := utils.CreateFsWatcher(time.Second, w.configFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -237,7 +202,6 @@ func (w *nfdWorker) Run() error {
|
|||
defer w.disconnect()
|
||||
|
||||
labelTrigger := time.After(0)
|
||||
var configTrigger <-chan time.Time
|
||||
for {
|
||||
select {
|
||||
case <-labelTrigger:
|
||||
|
@ -260,32 +224,8 @@ func (w *nfdWorker) Run() error {
|
|||
labelTrigger = time.After(w.config.Core.SleepInterval.Duration)
|
||||
}
|
||||
|
||||
case e := <-configWatch.Events:
|
||||
name := filepath.Clean(e.Name)
|
||||
|
||||
// If any of our paths (directories or the file itself) change
|
||||
if _, ok := paths[name]; ok {
|
||||
klog.Infof("fsnotify event in %q detected, reconfiguring fsnotify and reloading configuration", name)
|
||||
|
||||
// Blindly remove existing watch and add a new one
|
||||
if err := configWatch.Close(); err != nil {
|
||||
klog.Warningf("failed to close fsnotify watcher: %v", err)
|
||||
}
|
||||
configWatch, paths, err = addConfigWatch(w.configFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rate limiter. In certain filesystem operations we get
|
||||
// numerous events in quick succession and we only want one
|
||||
// config re-load
|
||||
configTrigger = time.After(time.Second)
|
||||
}
|
||||
|
||||
case e := <-configWatch.Errors:
|
||||
klog.Errorf("config file watcher error: %v", e)
|
||||
|
||||
case <-configTrigger:
|
||||
case <-configWatch.Events:
|
||||
klog.Infof("reloading configuration")
|
||||
if err := w.configure(w.configFilePath, w.args.Options); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
159
pkg/utils/fswatcher.go
Normal file
159
pkg/utils/fswatcher.go
Normal file
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// FsWatcher is a wrapper helper for watching files
|
||||
type FsWatcher struct {
|
||||
*fsnotify.Watcher
|
||||
|
||||
Events chan struct{}
|
||||
ratelimit time.Duration
|
||||
names []string
|
||||
paths map[string]struct{}
|
||||
}
|
||||
|
||||
// CreateFsWatcher creates a new FsWatcher
|
||||
func CreateFsWatcher(ratelimit time.Duration, names ...string) (*FsWatcher, error) {
|
||||
w := &FsWatcher{
|
||||
Events: make(chan struct{}),
|
||||
names: names,
|
||||
ratelimit: ratelimit,
|
||||
}
|
||||
|
||||
if err := w.reset(names...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go w.watch()
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// reset resets the file watches
|
||||
func (w *FsWatcher) reset(names ...string) error {
|
||||
if err := w.initWatcher(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.add(names...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *FsWatcher) initWatcher() error {
|
||||
if w.Watcher != nil {
|
||||
if err := w.Watcher.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close fsnotify watcher: %v", err)
|
||||
}
|
||||
}
|
||||
w.paths = make(map[string]struct{})
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
w.Watcher = nil
|
||||
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
|
||||
}
|
||||
w.Watcher = watcher
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *FsWatcher) add(names ...string) error {
|
||||
for _, name := range names {
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
added := false
|
||||
// Add watches for all directory components so that we catch e.g. renames
|
||||
// upper in the tree
|
||||
for p := name; ; p = filepath.Dir(p) {
|
||||
if _, ok := w.paths[p]; !ok {
|
||||
if err := w.Add(p); err != nil {
|
||||
klog.V(1).Infof("failed to add fsnotify watch for %q: %v", p, err)
|
||||
} else {
|
||||
klog.V(1).Infof("added fsnotify watch %q", p)
|
||||
added = true
|
||||
}
|
||||
|
||||
w.paths[p] = struct{}{}
|
||||
} else {
|
||||
added = true
|
||||
}
|
||||
if filepath.Dir(p) == p {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !added {
|
||||
// Want to be sure that we watch something
|
||||
return fmt.Errorf("failed to add any watch")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *FsWatcher) watch() {
|
||||
var ratelimiter <-chan time.Time
|
||||
for {
|
||||
select {
|
||||
case e, ok := <-w.Watcher.Events:
|
||||
// Watcher has been closed
|
||||
if !ok {
|
||||
klog.Infof("watcher closed")
|
||||
return
|
||||
}
|
||||
|
||||
// If any of our paths change
|
||||
name := filepath.Clean(e.Name)
|
||||
if _, ok := w.paths[filepath.Clean(name)]; ok {
|
||||
klog.V(2).Infof("fsnotify %s event in %q detected", e, name)
|
||||
|
||||
// Rate limiter. In certain filesystem operations we get
|
||||
// numerous events in quick succession
|
||||
ratelimiter = time.After(w.ratelimit)
|
||||
}
|
||||
|
||||
case e, ok := <-w.Watcher.Errors:
|
||||
// Watcher has been closed
|
||||
if !ok {
|
||||
klog.Infof("watcher closed")
|
||||
return
|
||||
}
|
||||
klog.Warningf("fswatcher error event detected: %v", e)
|
||||
|
||||
case <-ratelimiter:
|
||||
// Blindly remove existing watch and add a new one
|
||||
if err := w.reset(w.names...); err != nil {
|
||||
klog.Errorf("%v, re-trying in 60 seconds...", err)
|
||||
ratelimiter = time.After(60 * time.Second)
|
||||
}
|
||||
|
||||
w.Events <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue