diff --git a/pkg/utils/controller/handlers.go b/pkg/utils/controller/handlers.go index 87fccde0fe..e1b7d2de31 100644 --- a/pkg/utils/controller/handlers.go +++ b/pkg/utils/controller/handlers.go @@ -1,6 +1,8 @@ package controller import ( + "errors" + "github.com/go-logr/logr" kubeutils "github.com/kyverno/kyverno/pkg/utils/kube" "k8s.io/client-go/tools/cache" @@ -11,6 +13,7 @@ type ( addFunc func(interface{}) updateFunc func(interface{}, interface{}) deleteFunc func(interface{}) + keyFunc func(interface{}) (interface{}, error) ) func AddEventHandlers(informer cache.SharedInformer, a addFunc, u updateFunc, d deleteFunc) { @@ -21,32 +24,57 @@ func AddEventHandlers(informer cache.SharedInformer, a addFunc, u updateFunc, d }) } -func AddDefaultEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface) { - AddEventHandlers(informer, AddFunc(logger, queue), UpdateFunc(logger, queue), DeleteFunc(logger, queue)) +func AddKeyedEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey keyFunc) { + AddEventHandlers(informer, AddFunc(logger, queue, parseKey), UpdateFunc(logger, queue, parseKey), DeleteFunc(logger, queue, parseKey)) } -func Enqueue(logger logr.Logger, queue workqueue.RateLimitingInterface, obj interface{}) { - if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil { - logger.Error(err, "failed to compute key name") +func AddDefaultEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface) { + AddKeyedEventHandlers(logger, informer, queue, MetaNamespaceKey) +} + +func AddExplicitEventHandlers[K any](logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey func(K) cache.ExplicitKey) { + AddKeyedEventHandlers(logger, informer, queue, ExplicitKey(parseKey)) +} + +func Enqueue(logger logr.Logger, queue workqueue.RateLimitingInterface, obj interface{}, parseKey keyFunc) { + if key, err := parseKey(obj); err != nil { + logger.Error(err, "failed to compute key name", "obj", obj) } else { queue.Add(key) } } -func AddFunc(logger logr.Logger, queue workqueue.RateLimitingInterface) addFunc { - return func(obj interface{}) { - Enqueue(logger, queue, obj) +func MetaNamespaceKey(obj interface{}) (interface{}, error) { + return cache.MetaNamespaceKeyFunc(obj) +} + +func ExplicitKey[K any](parseKey func(K) cache.ExplicitKey) keyFunc { + return func(obj interface{}) (interface{}, error) { + if obj == nil { + return nil, errors.New("obj is nil") + } + if key, ok := obj.(K); !ok { + return nil, errors.New("obj cannot be converted") + } else { + return parseKey(key), nil + } } } -func UpdateFunc(logger logr.Logger, queue workqueue.RateLimitingInterface) updateFunc { +func AddFunc(logger logr.Logger, queue workqueue.RateLimitingInterface, parseKey keyFunc) addFunc { + return func(obj interface{}) { + Enqueue(logger, queue, obj, parseKey) + } +} + +func UpdateFunc(logger logr.Logger, queue workqueue.RateLimitingInterface, parseKey keyFunc) updateFunc { return func(_, obj interface{}) { - Enqueue(logger, queue, obj) + Enqueue(logger, queue, obj, parseKey) } } -func DeleteFunc(logger logr.Logger, queue workqueue.RateLimitingInterface) deleteFunc { +func DeleteFunc(logger logr.Logger, queue workqueue.RateLimitingInterface, parseKey keyFunc) deleteFunc { return func(obj interface{}) { - Enqueue(logger, queue, kubeutils.GetObjectWithTombstone(obj)) + Enqueue(logger, queue, kubeutils.GetObjectWithTombstone(obj), parseKey) } } diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index cc35a87da3..8a54d4fe58 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -34,33 +34,40 @@ func worker(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetrie } func processNextWorkItem(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool { - if key, quit := queue.Get(); !quit { - defer queue.Done(key) - handleErr(logger, queue, maxRetries, reconcile(key.(string), r), key) + if obj, quit := queue.Get(); !quit { + defer queue.Done(obj) + handleErr(logger, queue, maxRetries, reconcile(obj, r), obj) return true } return false } -func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, err error, key interface{}) { +func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, err error, obj interface{}) { if err == nil { - queue.Forget(key) + queue.Forget(obj) } else if errors.IsNotFound(err) { - logger.V(4).Info("Dropping request from the queue", "key", key, "error", err.Error()) - queue.Forget(key) - } else if queue.NumRequeues(key) < maxRetries { - logger.V(3).Info("Retrying request", "key", key, "error", err.Error()) - queue.AddRateLimited(key) + logger.V(4).Info("Dropping request from the queue", "obj", obj, "error", err.Error()) + queue.Forget(obj) + } else if queue.NumRequeues(obj) < maxRetries { + logger.V(3).Info("Retrying request", "obj", obj, "error", err.Error()) + queue.AddRateLimited(obj) } else { - logger.Error(err, "Failed to process request", "key", key) - queue.Forget(key) + logger.Error(err, "Failed to process request", "obj", obj) + queue.Forget(obj) } } -func reconcile(key string, r reconcileFunc) error { - if namespace, name, err := cache.SplitMetaNamespaceKey(key); err != nil { - return err +func reconcile(obj interface{}, r reconcileFunc) error { + var k, ns, n string + if key, ok := obj.(cache.ExplicitKey); ok { + k = string(key) } else { - return r(key, namespace, name) + k = obj.(string) + if namespace, name, err := cache.SplitMetaNamespaceKey(k); err != nil { + return err + } else { + ns, n = namespace, name + } } + return r(k, ns, n) }