mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-28 02:18:15 +00:00
fix: bug in report resource watcher (#5525)
* fix: bug in report resource watcher Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * fix Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
parent
0443705c07
commit
56c1585cb1
1 changed files with 62 additions and 49 deletions
|
@ -2,6 +2,7 @@ package resource
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -132,6 +133,64 @@ func (c *controller) AddEventHandler(eventHandler EventHandler) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *controller) startWatcher(ctx context.Context, logger logr.Logger, gvr schema.GroupVersionResource, gvk schema.GroupVersionKind) (*watcher, error) {
|
||||
hashes := map[types.UID]Resource{}
|
||||
objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to list resources")
|
||||
return nil, err
|
||||
} else {
|
||||
resourceVersion := objs.GetResourceVersion()
|
||||
for _, obj := range objs.Items {
|
||||
uid := obj.GetUID()
|
||||
hash := reportutils.CalculateResourceHash(obj)
|
||||
hashes[uid] = Resource{
|
||||
Hash: hash,
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.GetName(),
|
||||
}
|
||||
c.notify(Added, uid, gvk, hashes[uid])
|
||||
}
|
||||
logger := logger.WithValues("resourceVersion", resourceVersion)
|
||||
logger.Info("start watcher ...")
|
||||
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
logger.Info("creating watcher...")
|
||||
watch, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.Background(), options)
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to watch")
|
||||
}
|
||||
return watch, err
|
||||
}
|
||||
watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to create watcher")
|
||||
return nil, err
|
||||
} else {
|
||||
w := &watcher{
|
||||
watcher: watchInterface,
|
||||
gvk: gvk,
|
||||
hashes: hashes,
|
||||
}
|
||||
go func(gvr schema.GroupVersionResource) {
|
||||
defer logger.Info("watcher stopped")
|
||||
for event := range watchInterface.ResultChan() {
|
||||
switch event.Type {
|
||||
case watch.Added:
|
||||
c.updateHash(Added, event.Object.(*unstructured.Unstructured), gvr)
|
||||
case watch.Modified:
|
||||
c.updateHash(Modified, event.Object.(*unstructured.Unstructured), gvr)
|
||||
case watch.Deleted:
|
||||
c.deleteHash(event.Object.(*unstructured.Unstructured), gvr)
|
||||
case watch.Error:
|
||||
logger.Error(errors.New("watch error event received"), "watch error event received", "event", event.Object)
|
||||
}
|
||||
}
|
||||
}(gvr)
|
||||
return w, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) updateDynamicWatchers(ctx context.Context) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
@ -171,56 +230,10 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error {
|
|||
dynamicWatchers[gvr] = c.dynamicWatchers[gvr]
|
||||
delete(c.dynamicWatchers, gvr)
|
||||
} else {
|
||||
hashes := map[types.UID]Resource{}
|
||||
objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to list resources")
|
||||
if w, err := c.startWatcher(ctx, logger, gvr, gvk); err != nil {
|
||||
logger.Error(err, "failed to start watcher")
|
||||
} else {
|
||||
resourceVersion := objs.GetResourceVersion()
|
||||
for _, obj := range objs.Items {
|
||||
uid := obj.GetUID()
|
||||
hash := reportutils.CalculateResourceHash(obj)
|
||||
hashes[uid] = Resource{
|
||||
Hash: hash,
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.GetName(),
|
||||
}
|
||||
c.notify(Added, uid, gvk, hashes[uid])
|
||||
}
|
||||
logger := logger.WithValues("resourceVersion", resourceVersion)
|
||||
logger.Info("start watcher ...")
|
||||
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
watch, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.Background(), options)
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to watch")
|
||||
}
|
||||
return watch, err
|
||||
}
|
||||
watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to create watcher")
|
||||
} else {
|
||||
w := &watcher{
|
||||
watcher: watchInterface,
|
||||
gvk: gvk,
|
||||
hashes: hashes,
|
||||
}
|
||||
go func() {
|
||||
gvr := gvr
|
||||
defer logger.Info("watcher stopped")
|
||||
for event := range watchInterface.ResultChan() {
|
||||
switch event.Type {
|
||||
case watch.Added:
|
||||
c.updateHash(Added, event.Object.(*unstructured.Unstructured), gvr)
|
||||
case watch.Modified:
|
||||
c.updateHash(Modified, event.Object.(*unstructured.Unstructured), gvr)
|
||||
case watch.Deleted:
|
||||
c.deleteHash(event.Object.(*unstructured.Unstructured), gvr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
dynamicWatchers[gvr] = w
|
||||
}
|
||||
dynamicWatchers[gvr] = w
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue