From dfc2596a229866bc8f1b0611bb2ba184cfa4d0e6 Mon Sep 17 00:00:00 2001 From: Markus Lehtonen Date: Tue, 16 Feb 2021 21:14:22 +0200 Subject: [PATCH] pkg/utils: generalize file watcher Add the capability to watch multiple files. Move it to a separate package in order to make it reusable. --- pkg/nfd-worker/nfd-worker.go | 66 +-------------- pkg/utils/fswatcher.go | 159 +++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 63 deletions(-) create mode 100644 pkg/utils/fswatcher.go diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go index 1f1349aa7..1f0a571e8 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -28,7 +28,6 @@ import ( "strings" "time" - "github.com/fsnotify/fsnotify" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -169,40 +168,6 @@ func NewNfdWorker(args *Args) (NfdWorker, error) { return nfd, nil } -func addConfigWatch(path string) (*fsnotify.Watcher, map[string]struct{}, error) { - paths := make(map[string]struct{}) - - // Create watcher - w, err := fsnotify.NewWatcher() - if err != nil { - return w, paths, fmt.Errorf("failed to create fsnotify watcher: %v", err) - } - - // Add watches for all directory components so that we catch e.g. renames - // upper in the tree - added := false - for p := path; ; p = filepath.Dir(p) { - - if err := w.Add(p); err != nil { - klog.V(1).Infof("failed to add fsnotify watch for %q: %v", p, err) - } else { - klog.V(1).Infof("added fsnotify watch %q", p) - added = true - } - - paths[p] = struct{}{} - if filepath.Dir(p) == p { - break - } - } - - if !added { - // Want to be sure that we watch something - return w, paths, fmt.Errorf("failed to add any watch") - } - return w, paths, nil -} - func newDefaultConfig() *NFDConfig { return &NFDConfig{ Core: coreConfig{ @@ -221,7 +186,7 @@ func (w *nfdWorker) Run() error { klog.Infof("NodeName: '%s'", nodeName) // Create watcher for config file and read initial configuration - configWatch, paths, err := addConfigWatch(w.configFilePath) + configWatch, err := utils.CreateFsWatcher(time.Second, w.configFilePath) if err != nil { return err } @@ -237,7 +202,6 @@ func (w *nfdWorker) Run() error { defer w.disconnect() labelTrigger := time.After(0) - var configTrigger <-chan time.Time for { select { case <-labelTrigger: @@ -260,32 +224,8 @@ func (w *nfdWorker) Run() error { labelTrigger = time.After(w.config.Core.SleepInterval.Duration) } - case e := <-configWatch.Events: - name := filepath.Clean(e.Name) - - // If any of our paths (directories or the file itself) change - if _, ok := paths[name]; ok { - klog.Infof("fsnotify event in %q detected, reconfiguring fsnotify and reloading configuration", name) - - // Blindly remove existing watch and add a new one - if err := configWatch.Close(); err != nil { - klog.Warningf("failed to close fsnotify watcher: %v", err) - } - configWatch, paths, err = addConfigWatch(w.configFilePath) - if err != nil { - return err - } - - // Rate limiter. In certain filesystem operations we get - // numerous events in quick succession and we only want one - // config re-load - configTrigger = time.After(time.Second) - } - - case e := <-configWatch.Errors: - klog.Errorf("config file watcher error: %v", e) - - case <-configTrigger: + case <-configWatch.Events: + klog.Infof("reloading configuration") if err := w.configure(w.configFilePath, w.args.Options); err != nil { return err } diff --git a/pkg/utils/fswatcher.go b/pkg/utils/fswatcher.go new file mode 100644 index 000000000..164381934 --- /dev/null +++ b/pkg/utils/fswatcher.go @@ -0,0 +1,159 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" + "k8s.io/klog/v2" +) + +// FsWatcher is a wrapper helper for watching files +type FsWatcher struct { + *fsnotify.Watcher + + Events chan struct{} + ratelimit time.Duration + names []string + paths map[string]struct{} +} + +// CreateFsWatcher creates a new FsWatcher +func CreateFsWatcher(ratelimit time.Duration, names ...string) (*FsWatcher, error) { + w := &FsWatcher{ + Events: make(chan struct{}), + names: names, + ratelimit: ratelimit, + } + + if err := w.reset(names...); err != nil { + return nil, err + } + + go w.watch() + + return w, nil +} + +// reset resets the file watches +func (w *FsWatcher) reset(names ...string) error { + if err := w.initWatcher(); err != nil { + return err + } + if err := w.add(names...); err != nil { + return err + } + + return nil +} + +func (w *FsWatcher) initWatcher() error { + if w.Watcher != nil { + if err := w.Watcher.Close(); err != nil { + return fmt.Errorf("failed to close fsnotify watcher: %v", err) + } + } + w.paths = make(map[string]struct{}) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + w.Watcher = nil + return fmt.Errorf("failed to create fsnotify watcher: %v", err) + } + w.Watcher = watcher + + return nil +} + +func (w *FsWatcher) add(names ...string) error { + for _, name := range names { + if name == "" { + continue + } + + added := false + // Add watches for all directory components so that we catch e.g. renames + // upper in the tree + for p := name; ; p = filepath.Dir(p) { + if _, ok := w.paths[p]; !ok { + if err := w.Add(p); err != nil { + klog.V(1).Infof("failed to add fsnotify watch for %q: %v", p, err) + } else { + klog.V(1).Infof("added fsnotify watch %q", p) + added = true + } + + w.paths[p] = struct{}{} + } else { + added = true + } + if filepath.Dir(p) == p { + break + } + } + if !added { + // Want to be sure that we watch something + return fmt.Errorf("failed to add any watch") + } + } + + return nil +} + +func (w *FsWatcher) watch() { + var ratelimiter <-chan time.Time + for { + select { + case e, ok := <-w.Watcher.Events: + // Watcher has been closed + if !ok { + klog.Infof("watcher closed") + return + } + + // If any of our paths change + name := filepath.Clean(e.Name) + if _, ok := w.paths[filepath.Clean(name)]; ok { + klog.V(2).Infof("fsnotify %s event in %q detected", e, name) + + // Rate limiter. In certain filesystem operations we get + // numerous events in quick succession + ratelimiter = time.After(w.ratelimit) + } + + case e, ok := <-w.Watcher.Errors: + // Watcher has been closed + if !ok { + klog.Infof("watcher closed") + return + } + klog.Warningf("fswatcher error event detected: %v", e) + + case <-ratelimiter: + // Blindly remove existing watch and add a new one + if err := w.reset(w.names...); err != nil { + klog.Errorf("%v, re-trying in 60 seconds...", err) + ratelimiter = time.After(60 * time.Second) + } + + w.Events <- struct{}{} + } + } +}