diff --git a/pkg/controllers/report/resource/controller.go b/pkg/controllers/report/resource/controller.go index 51f388e1c6..c26d0d30eb 100644 --- a/pkg/controllers/report/resource/controller.go +++ b/pkg/controllers/report/resource/controller.go @@ -22,6 +22,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + watchTools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/workqueue" ) @@ -153,45 +155,50 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error { dynamicWatchers[gvr] = c.dynamicWatchers[gvr] delete(c.dynamicWatchers, gvr) } else { - logger.Info("start watcher ...", "gvr", gvr) - watchInterface, err := c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, metav1.ListOptions{}) + hashes := map[types.UID]Resource{} + objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{}) if err != nil { - logger.Error(err, "failed to create watcher", "gvr", gvr) + logger.Error(err, "failed to list resources", "gvr", gvr) } else { - w := &watcher{ - watcher: watchInterface, - gvk: gvk, - hashes: map[types.UID]Resource{}, + resourceVersion := objs.GetResourceVersion() + for _, obj := range objs.Items { + uid := obj.GetUID() + hash := reportutils.CalculateResourceHash(obj) + hashes[uid] = Resource{ + Hash: hash, + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + c.notify(uid, gvk, hashes[uid]) } - go func() { - gvr := gvr - defer logger.Info("watcher stopped") - for event := range watchInterface.ResultChan() { - switch event.Type { - case watch.Added: - c.updateHash(event.Object.(*unstructured.Unstructured), gvr) - case watch.Modified: - c.updateHash(event.Object.(*unstructured.Unstructured), gvr) - case watch.Deleted: - c.deleteHash(event.Object.(*unstructured.Unstructured), gvr) - } - } - }() - objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{}) + logger.Info("start watcher ...", "gvr", gvr, "resourceVersion", resourceVersion) + + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + return c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, options) + } + watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc}) if err != nil { - logger.Error(err, "failed to list resources", "gvr", gvr) - watchInterface.Stop() + logger.Error(err, "failed to create watcher", "gvr", gvr) } else { - for _, obj := range objs.Items { - uid := obj.GetUID() - hash := reportutils.CalculateResourceHash(obj) - w.hashes[uid] = Resource{ - Hash: hash, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } - c.notify(uid, w.gvk, w.hashes[uid]) + w := &watcher{ + watcher: watchInterface, + gvk: gvk, + hashes: hashes, } + go func() { + gvr := gvr + defer logger.Info("watcher stopped") + for event := range watchInterface.ResultChan() { + switch event.Type { + case watch.Added: + c.updateHash(event.Object.(*unstructured.Unstructured), gvr) + case watch.Modified: + c.updateHash(event.Object.(*unstructured.Unstructured), gvr) + case watch.Deleted: + c.deleteHash(event.Object.(*unstructured.Unstructured), gvr) + } + } + }() dynamicWatchers[gvr] = w } }