diff --git a/pkg/controllers/report/resource/controller.go b/pkg/controllers/report/resource/controller.go index f6cff23998..e9af0ef374 100644 --- a/pkg/controllers/report/resource/controller.go +++ b/pkg/controllers/report/resource/controller.go @@ -2,6 +2,7 @@ package resource import ( "context" + "errors" "sync" "time" @@ -132,6 +133,64 @@ func (c *controller) AddEventHandler(eventHandler EventHandler) { } } +func (c *controller) startWatcher(ctx context.Context, logger logr.Logger, gvr schema.GroupVersionResource, gvk schema.GroupVersionKind) (*watcher, error) { + hashes := map[types.UID]Resource{} + objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{}) + if err != nil { + logger.Error(err, "failed to list resources") + return nil, err + } else { + resourceVersion := objs.GetResourceVersion() + for _, obj := range objs.Items { + uid := obj.GetUID() + hash := reportutils.CalculateResourceHash(obj) + hashes[uid] = Resource{ + Hash: hash, + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + c.notify(Added, uid, gvk, hashes[uid]) + } + logger := logger.WithValues("resourceVersion", resourceVersion) + logger.Info("start watcher ...") + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + logger.Info("creating watcher...") + watch, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.Background(), options) + if err != nil { + logger.Error(err, "failed to watch") + } + return watch, err + } + watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc}) + if err != nil { + logger.Error(err, "failed to create watcher") + return nil, err + } else { + w := &watcher{ + watcher: watchInterface, + gvk: gvk, + hashes: hashes, + } + go func(gvr schema.GroupVersionResource) { + defer logger.Info("watcher stopped") + for event := range watchInterface.ResultChan() { + switch event.Type { + case watch.Added: + c.updateHash(Added, event.Object.(*unstructured.Unstructured), gvr) + case watch.Modified: + c.updateHash(Modified, event.Object.(*unstructured.Unstructured), gvr) + case watch.Deleted: + c.deleteHash(event.Object.(*unstructured.Unstructured), gvr) + case watch.Error: + logger.Error(errors.New("watch error event received"), "watch error event received", "event", event.Object) + } + } + }(gvr) + return w, nil + } + } +} + func (c *controller) updateDynamicWatchers(ctx context.Context) error { c.lock.Lock() defer c.lock.Unlock() @@ -171,56 +230,10 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error { dynamicWatchers[gvr] = c.dynamicWatchers[gvr] delete(c.dynamicWatchers, gvr) } else { - hashes := map[types.UID]Resource{} - objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{}) - if err != nil { - logger.Error(err, "failed to list resources") + if w, err := c.startWatcher(ctx, logger, gvr, gvk); err != nil { + logger.Error(err, "failed to start watcher") } else { - resourceVersion := objs.GetResourceVersion() - for _, obj := range objs.Items { - uid := obj.GetUID() - hash := reportutils.CalculateResourceHash(obj) - hashes[uid] = Resource{ - Hash: hash, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } - c.notify(Added, uid, gvk, hashes[uid]) - } - logger := logger.WithValues("resourceVersion", resourceVersion) - logger.Info("start watcher ...") - watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { - watch, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.Background(), options) - if err != nil { - logger.Error(err, "failed to watch") - } - return watch, err - } - watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc}) - if err != nil { - logger.Error(err, "failed to create watcher") - } else { - w := &watcher{ - watcher: watchInterface, - gvk: gvk, - hashes: hashes, - } - go func() { - gvr := gvr - defer logger.Info("watcher stopped") - for event := range watchInterface.ResultChan() { - switch event.Type { - case watch.Added: - c.updateHash(Added, event.Object.(*unstructured.Unstructured), gvr) - case watch.Modified: - c.updateHash(Modified, event.Object.(*unstructured.Unstructured), gvr) - case watch.Deleted: - c.deleteHash(event.Object.(*unstructured.Unstructured), gvr) - } - } - }() - dynamicWatchers[gvr] = w - } + dynamicWatchers[gvr] = w } } }