1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 02:18:15 +00:00

fix closed watchers in the resource-report-controller (#5350)

* fix: fix closed watchers in the resource-report-controller

Signed-off-by: Rodrigo Fior Kuntzer <rodrigo@miro.com>

* fix: using the resourceVersion from the list operation result

Signed-off-by: Rodrigo Fior Kuntzer <rodrigo@miro.com>

Signed-off-by: Rodrigo Fior Kuntzer <rodrigo@miro.com>
Co-authored-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Rodrigo Fior Kuntzer 2022-11-17 10:37:51 +01:00 committed by GitHub
parent 449cd4356c
commit bb90bb25ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchTools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/workqueue"
)
@ -153,45 +155,50 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error {
dynamicWatchers[gvr] = c.dynamicWatchers[gvr]
delete(c.dynamicWatchers, gvr)
} else {
logger.Info("start watcher ...", "gvr", gvr)
watchInterface, err := c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, metav1.ListOptions{})
hashes := map[types.UID]Resource{}
objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
logger.Error(err, "failed to create watcher", "gvr", gvr)
logger.Error(err, "failed to list resources", "gvr", gvr)
} else {
w := &watcher{
watcher: watchInterface,
gvk: gvk,
hashes: map[types.UID]Resource{},
resourceVersion := objs.GetResourceVersion()
for _, obj := range objs.Items {
uid := obj.GetUID()
hash := reportutils.CalculateResourceHash(obj)
hashes[uid] = Resource{
Hash: hash,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
c.notify(uid, gvk, hashes[uid])
}
go func() {
gvr := gvr
defer logger.Info("watcher stopped")
for event := range watchInterface.ResultChan() {
switch event.Type {
case watch.Added:
c.updateHash(event.Object.(*unstructured.Unstructured), gvr)
case watch.Modified:
c.updateHash(event.Object.(*unstructured.Unstructured), gvr)
case watch.Deleted:
c.deleteHash(event.Object.(*unstructured.Unstructured), gvr)
}
}
}()
objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{})
logger.Info("start watcher ...", "gvr", gvr, "resourceVersion", resourceVersion)
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
return c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, options)
}
watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
if err != nil {
logger.Error(err, "failed to list resources", "gvr", gvr)
watchInterface.Stop()
logger.Error(err, "failed to create watcher", "gvr", gvr)
} else {
for _, obj := range objs.Items {
uid := obj.GetUID()
hash := reportutils.CalculateResourceHash(obj)
w.hashes[uid] = Resource{
Hash: hash,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
c.notify(uid, w.gvk, w.hashes[uid])
w := &watcher{
watcher: watchInterface,
gvk: gvk,
hashes: hashes,
}
go func() {
gvr := gvr
defer logger.Info("watcher stopped")
for event := range watchInterface.ResultChan() {
switch event.Type {
case watch.Added:
c.updateHash(event.Object.(*unstructured.Unstructured), gvr)
case watch.Modified:
c.updateHash(event.Object.(*unstructured.Unstructured), gvr)
case watch.Deleted:
c.deleteHash(event.Object.(*unstructured.Unstructured), gvr)
}
}
}()
dynamicWatchers[gvr] = w
}
}