1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 02:18:15 +00:00

chore: improve cleanup controller (#5509)

* chore: improve cleanup controller

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* chore: improve cleanup controller

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-11-30 17:23:12 +01:00 committed by GitHub
parent 83bbf87ff6
commit 83b088ecb9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 99 deletions

View file

@ -30,7 +30,9 @@ type controller struct {
cjLister batchv1listers.CronJobLister
// queue
queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface
cpolEnqueue controllerutils.EnqueueFuncT[*kyvernov1alpha1.ClusterCleanupPolicy]
polEnqueue controllerutils.EnqueueFuncT[*kyvernov1alpha1.CleanupPolicy]
}
const (
@ -45,99 +47,21 @@ func NewController(
polInformer kyvernov1alpha1informers.CleanupPolicyInformer,
cjInformer batchv1informers.CronJobInformer,
) controllers.Controller {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
c := &controller{
client: client,
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
cjLister: cjInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
client: client,
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
cjLister: cjInformer.Lister(),
queue: queue,
cpolEnqueue: controllerutils.AddDefaultEventHandlersT[*kyvernov1alpha1.ClusterCleanupPolicy](logger, cpolInformer.Informer(), queue),
polEnqueue: controllerutils.AddDefaultEventHandlersT[*kyvernov1alpha1.CleanupPolicy](logger, polInformer.Informer(), queue),
}
controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue)
controllerutils.AddDefaultEventHandlers(logger, polInformer.Informer(), c.queue)
cpolEnqueue := controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue)
polEnqueue := controllerutils.AddDefaultEventHandlers(logger, polInformer.Informer(), c.queue)
controllerutils.AddEventHandlersT(
cjInformer.Informer(),
func(n *batchv1.CronJob) {
if len(n.OwnerReferences) == 1 {
if n.OwnerReferences[0].Kind == "ClusterCleanupPolicy" {
cpol := kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
},
}
err := cpolEnqueue(&cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if n.OwnerReferences[0].Kind == "CleanupPolicy" {
pol := kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
Namespace: n.Namespace,
},
}
err := polEnqueue(&pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
},
func(o *batchv1.CronJob, n *batchv1.CronJob) {
if o.GetResourceVersion() != n.GetResourceVersion() {
for _, owner := range n.OwnerReferences {
if owner.Kind == "ClusterCleanupPolicy" {
cpol := kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: owner.Name,
},
}
err := cpolEnqueue(&cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if owner.Kind == "CleanupPolicy" {
pol := kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: owner.Name,
Namespace: n.Namespace,
},
}
err := polEnqueue(&pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
}
},
func(n *batchv1.CronJob) {
if len(n.OwnerReferences) == 1 {
if n.OwnerReferences[0].Kind == "ClusterCleanupPolicy" {
cpol := kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
},
}
err := cpolEnqueue(&cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if n.OwnerReferences[0].Kind == "CleanupPolicy" {
pol := kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
Namespace: n.Namespace,
},
}
err := polEnqueue(&pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
},
func(n *batchv1.CronJob) { c.enqueueCronJob(n) },
func(o *batchv1.CronJob, n *batchv1.CronJob) { c.enqueueCronJob(o) },
func(n *batchv1.CronJob) { c.enqueueCronJob(n) },
)
return c
}
@ -146,6 +70,33 @@ func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, logger.V(3), ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) enqueueCronJob(n *batchv1.CronJob) {
if len(n.OwnerReferences) == 1 {
if n.OwnerReferences[0].Kind == "ClusterCleanupPolicy" {
cpol := &kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
},
}
err := c.cpolEnqueue(cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if n.OwnerReferences[0].Kind == "CleanupPolicy" {
pol := &kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
Namespace: n.Namespace,
},
}
err := c.polEnqueue(pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
}
func (c *controller) getPolicy(namespace, name string) (kyvernov1alpha1.CleanupPolicyInterface, error) {
if namespace == "" {
cpolicy, err := c.cpolLister.Get(name)

View file

@ -11,14 +11,15 @@ import (
)
type (
addFunc func(interface{})
updateFunc func(interface{}, interface{})
deleteFunc func(interface{})
addFunc = addFuncT[interface{}]
updateFunc = updateFuncT[interface{}]
deleteFunc = deleteFuncT[interface{}]
addFuncT[T any] func(T)
updateFuncT[T any] func(T, T)
deleteFuncT[T any] func(T)
keyFunc func(interface{}) (interface{}, error)
EnqueueFunc func(interface{}) error
keyFunc = keyFuncT[interface{}, interface{}]
keyFuncT[T, U any] func(T) (U, error)
EnqueueFunc = EnqueueFuncT[interface{}]
EnqueueFuncT[T any] func(T) error
)
@ -44,6 +45,12 @@ func AddKeyedEventHandlers(logger logr.Logger, informer cache.SharedInformer, qu
return enqueueFunc
}
func AddKeyedEventHandlersT[K metav1.Object](logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey keyFuncT[K, interface{}]) EnqueueFuncT[K] {
enqueueFunc := LogError(logger, Parse(parseKey, Queue(queue)))
AddEventHandlersT(informer, AddFuncT(logger, enqueueFunc), UpdateFuncT(logger, enqueueFunc), DeleteFuncT(logger, enqueueFunc))
return enqueueFunc
}
func AddDelayedKeyedEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, delay time.Duration, parseKey keyFunc) EnqueueFunc {
enqueueFunc := LogError(logger, Parse(parseKey, QueueAfter(queue, delay)))
AddEventHandlers(informer, AddFunc(logger, enqueueFunc), UpdateFunc(logger, enqueueFunc), DeleteFunc(logger, enqueueFunc))
@ -54,6 +61,10 @@ func AddDefaultEventHandlers(logger logr.Logger, informer cache.SharedInformer,
return AddKeyedEventHandlers(logger, informer, queue, MetaNamespaceKey)
}
func AddDefaultEventHandlersT[K metav1.Object](logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface) EnqueueFuncT[K] {
return AddKeyedEventHandlersT(logger, informer, queue, MetaNamespaceKeyT[K])
}
func AddDelayedDefaultEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, delay time.Duration) EnqueueFunc {
return AddDelayedKeyedEventHandlers(logger, informer, queue, delay, MetaNamespaceKey)
}
@ -66,8 +77,8 @@ func AddDelayedExplicitEventHandlers[K any](logger logr.Logger, informer cache.S
return AddDelayedKeyedEventHandlers(logger, informer, queue, delay, ExplicitKey(parseKey))
}
func LogError(logger logr.Logger, inner EnqueueFunc) EnqueueFunc {
return func(obj interface{}) error {
func LogError[K any](logger logr.Logger, inner EnqueueFuncT[K]) EnqueueFuncT[K] {
return func(obj K) error {
err := inner(obj)
if err != nil {
logger.Error(err, "failed to compute key name", "obj", obj)
@ -76,8 +87,8 @@ func LogError(logger logr.Logger, inner EnqueueFunc) EnqueueFunc {
}
}
func Parse(parseKey keyFunc, inner EnqueueFunc) EnqueueFunc {
return func(obj interface{}) error {
func Parse[K, L any](parseKey keyFuncT[K, L], inner EnqueueFuncT[L]) EnqueueFuncT[K] {
return func(obj K) error {
if key, err := parseKey(obj); err != nil {
return err
} else {
@ -104,6 +115,10 @@ func MetaNamespaceKey(obj interface{}) (interface{}, error) {
return cache.MetaNamespaceKeyFunc(obj)
}
func MetaNamespaceKeyT[T any](obj T) (interface{}, error) {
return MetaNamespaceKey(obj)
}
func ExplicitKey[K any](parseKey func(K) cache.ExplicitKey) keyFunc {
return func(obj interface{}) (interface{}, error) {
if obj == nil {
@ -144,3 +159,29 @@ func DeleteFunc(logger logr.Logger, enqueue EnqueueFunc) deleteFunc {
}
}
}
func AddFuncT[K metav1.Object](logger logr.Logger, enqueue EnqueueFuncT[K]) addFuncT[K] {
return func(obj K) {
if err := enqueue(obj); err != nil {
logger.Error(err, "failed to enqueue object", "obj", obj)
}
}
}
func UpdateFuncT[K metav1.Object](logger logr.Logger, enqueue EnqueueFuncT[K]) updateFuncT[K] {
return func(old, obj K) {
if old.GetResourceVersion() != obj.GetResourceVersion() {
if err := enqueue(obj); err != nil {
logger.Error(err, "failed to enqueue object", "obj", obj)
}
}
}
}
func DeleteFuncT[K metav1.Object](logger logr.Logger, enqueue EnqueueFuncT[K]) deleteFuncT[K] {
return func(obj K) {
if err := enqueue(obj); err != nil {
logger.Error(err, "failed to enqueue object", "obj", obj)
}
}
}