mirror of
https://github.com/kubernetes-sigs/node-feature-discovery.git
synced 2025-03-28 18:57:10 +00:00
nfd-updater: events: enable timer-only flow
The nfd-topology-updater has state-directories notification mechanism enabled by default. In theory, we can have only timer-based updates, but if the option is given to disable the state-directories event source, then all the update mechanism is mistakenly disabled, including the timer-based updates. The two updaters mechanism should be decoupled. So this PR changes this to make sure we can enable just and only the timer-based updates. Signed-off-by: Francesco Romani <fromani@redhat.com>
This commit is contained in:
parent
48f37070ed
commit
000c919071
4 changed files with 64 additions and 16 deletions
|
@ -54,26 +54,35 @@ type Info struct {
|
|||
}
|
||||
|
||||
func New(sleepInterval time.Duration, dest chan<- Info, kubeletStateDir string) (*Notifier, error) {
|
||||
devicePluginsDir := path.Join(kubeletStateDir, devicePluginsDirName)
|
||||
ch, err := createFSWatcherEvent([]string{kubeletStateDir, devicePluginsDir})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Notifier{
|
||||
notif := Notifier{
|
||||
sleepInterval: sleepInterval,
|
||||
dest: dest,
|
||||
fsEvent: ch,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if kubeletStateDir != "" {
|
||||
devicePluginsDir := path.Join(kubeletStateDir, devicePluginsDirName)
|
||||
ch, err := createFSWatcherEvent([]string{kubeletStateDir, devicePluginsDir})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
notif.fsEvent = ch
|
||||
}
|
||||
|
||||
return ¬if, nil
|
||||
}
|
||||
|
||||
func (n *Notifier) Run() {
|
||||
timeEvents := make(<-chan time.Time)
|
||||
var timeEvents <-chan time.Time
|
||||
|
||||
if n.sleepInterval > 0 {
|
||||
ticker := time.NewTicker(n.sleepInterval)
|
||||
defer ticker.Stop()
|
||||
timeEvents = ticker.C
|
||||
}
|
||||
|
||||
// it's safe to keep the channels we don't need nil:
|
||||
// https://dave.cheney.net/2014/03/19/channel-axioms
|
||||
// "A receive from a nil channel blocks forever"
|
||||
for {
|
||||
select {
|
||||
case <-timeEvents:
|
||||
|
|
|
@ -85,13 +85,12 @@ type nfdTopologyUpdater struct {
|
|||
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
|
||||
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) {
|
||||
eventSource := make(chan kubeletnotifier.Info)
|
||||
if args.KubeletStateDir != "" {
|
||||
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go ntf.Run()
|
||||
|
||||
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go ntf.Run()
|
||||
|
||||
kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
|
||||
if err != nil {
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
|
@ -385,6 +386,45 @@ excludeList:
|
|||
}, 1*time.Minute, 10*time.Second).Should(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
When("kubelet state monitoring disabled", func() {
|
||||
BeforeEach(func(ctx context.Context) {
|
||||
cfg, err := testutils.GetConfig()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
kcfg := cfg.GetKubeletConfig()
|
||||
By(fmt.Sprintf("Using config (%#v)", kcfg))
|
||||
// we need a predictable and "low enough" sleep interval to make sure we wait enough time, and still we don't want to waste too much time waiting
|
||||
podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-kubelet-state-dir=", "-sleep-interval=3s")}
|
||||
topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...)
|
||||
})
|
||||
|
||||
It("should still create or update CRs with periodic updates", func(ctx context.Context) {
|
||||
// this is the simplest test. A more refined test would be check updates. We do like this to minimize flakes.
|
||||
By("deleting existing CRs")
|
||||
|
||||
err := topologyClient.TopologyV1alpha2().NodeResourceTopologies().Delete(ctx, topologyUpdaterNode.Name, metav1.DeleteOptions{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// need to set the polling interval explicitly and bigger than the sleep interval
|
||||
By("checking the topology was recreated or updated")
|
||||
Eventually(func() bool {
|
||||
_, err = topologyClient.TopologyV1alpha2().NodeResourceTopologies().Get(ctx, topologyUpdaterNode.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
framework.Logf("missing node topology resource for %q", topologyUpdaterNode.Name)
|
||||
return true // intentionally retry
|
||||
}
|
||||
framework.Logf("failed to get the node topology resource: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(BeTrue())
|
||||
|
||||
framework.Logf("found NRT data for node %q!", topologyUpdaterNode.Name)
|
||||
})
|
||||
})
|
||||
|
||||
When("topology-updater configure to compute pod fingerprint", func() {
|
||||
BeforeEach(func(ctx context.Context) {
|
||||
cfg, err := testutils.GetConfig()
|
||||
|
|
|
@ -128,7 +128,7 @@ func GetNodeTopology(ctx context.Context, topologyClient *topologyclientset.Clie
|
|||
return false
|
||||
}
|
||||
return true
|
||||
}, time.Minute, 5*time.Second).Should(gomega.BeTrue())
|
||||
}).WithPolling(5 * time.Second).WithTimeout(1 * time.Minute).Should(gomega.BeTrue())
|
||||
return nodeTopology
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue