1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

feat: add explicit key support to controller utils (#4628)

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-09-19 13:25:03 +02:00 committed by GitHub
parent 71404df826
commit 6eea7c45f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 28 deletions

View file

@ -1,6 +1,8 @@
package controller
import (
"errors"
"github.com/go-logr/logr"
kubeutils "github.com/kyverno/kyverno/pkg/utils/kube"
"k8s.io/client-go/tools/cache"
@ -11,6 +13,7 @@ type (
addFunc func(interface{})
updateFunc func(interface{}, interface{})
deleteFunc func(interface{})
keyFunc func(interface{}) (interface{}, error)
)
func AddEventHandlers(informer cache.SharedInformer, a addFunc, u updateFunc, d deleteFunc) {
@ -21,32 +24,57 @@ func AddEventHandlers(informer cache.SharedInformer, a addFunc, u updateFunc, d
})
}
func AddDefaultEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface) {
AddEventHandlers(informer, AddFunc(logger, queue), UpdateFunc(logger, queue), DeleteFunc(logger, queue))
func AddKeyedEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey keyFunc) {
AddEventHandlers(informer, AddFunc(logger, queue, parseKey), UpdateFunc(logger, queue, parseKey), DeleteFunc(logger, queue, parseKey))
}
func Enqueue(logger logr.Logger, queue workqueue.RateLimitingInterface, obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil {
logger.Error(err, "failed to compute key name")
func AddDefaultEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface) {
AddKeyedEventHandlers(logger, informer, queue, MetaNamespaceKey)
}
func AddExplicitEventHandlers[K any](logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey func(K) cache.ExplicitKey) {
AddKeyedEventHandlers(logger, informer, queue, ExplicitKey(parseKey))
}
func Enqueue(logger logr.Logger, queue workqueue.RateLimitingInterface, obj interface{}, parseKey keyFunc) {
if key, err := parseKey(obj); err != nil {
logger.Error(err, "failed to compute key name", "obj", obj)
} else {
queue.Add(key)
}
}
func AddFunc(logger logr.Logger, queue workqueue.RateLimitingInterface) addFunc {
return func(obj interface{}) {
Enqueue(logger, queue, obj)
func MetaNamespaceKey(obj interface{}) (interface{}, error) {
return cache.MetaNamespaceKeyFunc(obj)
}
func ExplicitKey[K any](parseKey func(K) cache.ExplicitKey) keyFunc {
return func(obj interface{}) (interface{}, error) {
if obj == nil {
return nil, errors.New("obj is nil")
}
if key, ok := obj.(K); !ok {
return nil, errors.New("obj cannot be converted")
} else {
return parseKey(key), nil
}
}
}
func UpdateFunc(logger logr.Logger, queue workqueue.RateLimitingInterface) updateFunc {
func AddFunc(logger logr.Logger, queue workqueue.RateLimitingInterface, parseKey keyFunc) addFunc {
return func(obj interface{}) {
Enqueue(logger, queue, obj, parseKey)
}
}
func UpdateFunc(logger logr.Logger, queue workqueue.RateLimitingInterface, parseKey keyFunc) updateFunc {
return func(_, obj interface{}) {
Enqueue(logger, queue, obj)
Enqueue(logger, queue, obj, parseKey)
}
}
func DeleteFunc(logger logr.Logger, queue workqueue.RateLimitingInterface) deleteFunc {
func DeleteFunc(logger logr.Logger, queue workqueue.RateLimitingInterface, parseKey keyFunc) deleteFunc {
return func(obj interface{}) {
Enqueue(logger, queue, kubeutils.GetObjectWithTombstone(obj))
Enqueue(logger, queue, kubeutils.GetObjectWithTombstone(obj), parseKey)
}
}

View file

@ -34,33 +34,40 @@ func worker(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetrie
}
func processNextWorkItem(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool {
if key, quit := queue.Get(); !quit {
defer queue.Done(key)
handleErr(logger, queue, maxRetries, reconcile(key.(string), r), key)
if obj, quit := queue.Get(); !quit {
defer queue.Done(obj)
handleErr(logger, queue, maxRetries, reconcile(obj, r), obj)
return true
}
return false
}
func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, err error, key interface{}) {
func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, err error, obj interface{}) {
if err == nil {
queue.Forget(key)
queue.Forget(obj)
} else if errors.IsNotFound(err) {
logger.V(4).Info("Dropping request from the queue", "key", key, "error", err.Error())
queue.Forget(key)
} else if queue.NumRequeues(key) < maxRetries {
logger.V(3).Info("Retrying request", "key", key, "error", err.Error())
queue.AddRateLimited(key)
logger.V(4).Info("Dropping request from the queue", "obj", obj, "error", err.Error())
queue.Forget(obj)
} else if queue.NumRequeues(obj) < maxRetries {
logger.V(3).Info("Retrying request", "obj", obj, "error", err.Error())
queue.AddRateLimited(obj)
} else {
logger.Error(err, "Failed to process request", "key", key)
queue.Forget(key)
logger.Error(err, "Failed to process request", "obj", obj)
queue.Forget(obj)
}
}
func reconcile(key string, r reconcileFunc) error {
if namespace, name, err := cache.SplitMetaNamespaceKey(key); err != nil {
return err
func reconcile(obj interface{}, r reconcileFunc) error {
var k, ns, n string
if key, ok := obj.(cache.ExplicitKey); ok {
k = string(key)
} else {
return r(key, namespace, name)
k = obj.(string)
if namespace, name, err := cache.SplitMetaNamespaceKey(k); err != nil {
return err
} else {
ns, n = namespace, name
}
}
return r(k, ns, n)
}