1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-13 19:28:55 +00:00

fix: use controller utils package in ttl controller (#8169)

* included controller-util

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* refactor

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* updated event handlers

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* added registration

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* fix

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* fix

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* event handler refactor

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* lint

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* enhancements

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* util refactor

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* removed comments

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix handler

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

---------

Signed-off-by: Ved Ratan <vedratan8@gmail.com>
Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Co-authored-by: shuting <shuting@nirmata.com>
Co-authored-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Ved Ratan 2023-09-08 12:42:34 +05:30 committed by GitHub
parent 86039a3b32
commit 10dacd5292
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 93 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/api/kyverno"
"github.com/kyverno/kyverno/pkg/metrics"
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
@ -14,18 +15,22 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
// Workers is the number of workers for this controller
maxRetries = 10
)
type controller struct {
name string
client metadata.Getter
queue workqueue.RateLimitingInterface
lister cache.GenericLister
wg wait.Group
informer cache.SharedIndexInformer
registration cache.ResourceEventHandlerRegistration
logger logr.Logger
@ -39,21 +44,29 @@ type ttlMetrics struct {
}
func newController(client metadata.Getter, metainformer informers.GenericInformer, logger logr.Logger, gvr schema.GroupVersionResource) (*controller, error) {
name := gvr.Version + "/" + gvr.Resource
if gvr.Group != "" {
name = gvr.Group + "/" + name
}
queue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: name})
c := &controller{
name: name,
client: client,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: queue,
lister: metainformer.Lister(),
wg: wait.Group{},
informer: metainformer.Informer(),
logger: logger,
metrics: newTTLMetrics(logger),
}
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleAdd,
UpdateFunc: c.handleUpdate,
})
enqueue := controllerutils.LogError(logger, controllerutils.Parse(controllerutils.MetaNamespaceKey, controllerutils.Queue(queue)))
registration, err := controllerutils.AddEventHandlers(
c.informer,
controllerutils.AddFunc(logger, enqueue),
controllerutils.UpdateFunc(logger, enqueue),
nil,
)
if err != nil {
logger.Error(err, "failed to register event handler")
logger.Error(err, "failed to register event handlers")
return nil, err
}
c.registration = registration
@ -82,46 +95,18 @@ func newTTLMetrics(logger logr.Logger) ttlMetrics {
}
}
func (c *controller) handleAdd(obj interface{}) {
c.enqueue(obj)
}
func (c *controller) handleUpdate(oldObj, newObj interface{}) {
old := oldObj.(metav1.Object)
new := newObj.(metav1.Object)
if old.GetResourceVersion() != new.GetResourceVersion() {
c.enqueue(newObj)
}
}
func (c *controller) Start(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
c.wg.StartWithContext(ctx, func(ctx context.Context) {
defer c.logger.V(3).Info("worker stopped")
c.logger.V(3).Info("worker starting ....")
wait.UntilWithContext(ctx, c.worker, 1*time.Second)
})
}
controllerutils.Run(ctx, c.logger, c.name, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) Stop() {
defer c.logger.V(3).Info("queue stopped")
defer c.wg.Wait()
// Unregister the event handlers
c.deregisterEventHandlers()
c.logger.V(3).Info("queue stopping ....")
c.queue.ShutDown()
}
func (c *controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "failed to extract name")
return
}
c.queue.Add(key)
}
// deregisterEventHandlers deregisters the event handlers from the informer.
func (c *controller) deregisterEventHandlers() {
err := c.informer.RemoveEventHandler(c.registration)
@ -132,36 +117,7 @@ func (c *controller) deregisterEventHandlers() {
c.logger.V(3).Info("deregistered event handlers")
}
func (c *controller) worker(ctx context.Context) {
for {
if !c.processItem() {
// No more items in the queue, exit the loop
break
}
}
}
func (c *controller) processItem() bool {
item, shutdown := c.queue.Get()
if shutdown {
return false
}
// In any case we need to call Done
defer c.queue.Done(item)
err := c.reconcile(item.(string))
if err != nil {
c.logger.Error(err, "reconciliation failed")
c.queue.AddRateLimited(item)
return true
} else {
// If no error, we call Forget to reset the rate limiter
c.queue.Forget(item)
}
return true
}
func (c *controller) reconcile(itemKey string) error {
logger := c.logger.WithValues("key", itemKey)
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, itemKey string, _, _ string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(itemKey)
if err != nil {
return err
@ -175,43 +131,33 @@ func (c *controller) reconcile(itemKey string) error {
// there was an error, return it to requeue the key
return err
}
metaObj, err := meta.Accessor(obj)
if err != nil {
logger.Info("object is not of type metav1.Object")
return err
}
commonLabels := []attribute.KeyValue{
attribute.String("resource_namespace", metaObj.GetNamespace()),
attribute.String("resource_group", c.gvr.Group),
attribute.String("resource_version", c.gvr.Version),
attribute.String("resource", c.gvr.Resource),
}
// if the object is being deleted, return early
if metaObj.GetDeletionTimestamp() != nil {
return nil
}
labels := metaObj.GetLabels()
ttlValue, ok := labels[kyverno.LabelCleanupTtl]
if !ok {
// No 'ttl' label present, no further action needed
return nil
}
var deletionTime time.Time
// Try parsing ttlValue as duration
err = parseDeletionTime(metaObj, &deletionTime, ttlValue)
if err != nil {
if err := parseDeletionTime(metaObj, &deletionTime, ttlValue); err != nil {
logger.Error(err, "failed to parse label", "value", ttlValue)
return nil
}
if time.Now().After(deletionTime) {
err = c.client.Namespace(namespace).Delete(context.Background(), metaObj.GetName(), metav1.DeleteOptions{})
if err != nil {

View file

@ -148,16 +148,17 @@ func (m *manager) start(ctx context.Context, gvr schema.GroupVersionResource, wo
options,
)
cont, cancel := context.WithCancel(ctx)
var wg wait.Group
wg.StartWithContext(cont, func(ctx context.Context) {
var informerWaitGroup wait.Group
informerWaitGroup.StartWithContext(cont, func(ctx context.Context) {
logger.V(3).Info("informer starting...")
defer logger.V(3).Info("informer stopping...")
informer.Informer().Run(cont.Done())
})
stopInformer := func() {
// Send stop signal to informer's goroutine
cancel()
// Wait for the group to terminate
wg.Wait()
informerWaitGroup.Wait()
}
if !cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) {
stopInformer()
@ -168,11 +169,16 @@ func (m *manager) start(ctx context.Context, gvr schema.GroupVersionResource, wo
stopInformer()
return err
}
logger.V(3).Info("controller starting...")
controller.Start(cont, workers)
var controllerWaitGroup wait.Group
controllerWaitGroup.StartWithContext(cont, func(ctx context.Context) {
logger.V(3).Info("controller starting...")
defer logger.V(3).Info("controller stopping...")
controller.Start(ctx, workers)
})
m.resController[gvr] = func() {
stopInformer()
controller.Stop()
controllerWaitGroup.Wait()
}
return nil
}

View file

@ -25,22 +25,33 @@ type (
)
func AddEventHandlers(informer cache.SharedInformer, a addFunc, u updateFunc, d deleteFunc) (cache.ResourceEventHandlerRegistration, error) {
var onDelete deleteFunc
if d != nil {
onDelete = func(obj interface{}) {
d(kubeutils.GetObjectWithTombstone(obj))
}
}
return informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: a,
UpdateFunc: u,
DeleteFunc: func(obj interface{}) {
d(kubeutils.GetObjectWithTombstone(obj))
},
DeleteFunc: onDelete,
})
}
func AddEventHandlersT[T any](informer cache.SharedInformer, a addFuncT[T], u updateFuncT[T], d deleteFuncT[T]) (cache.ResourceEventHandlerRegistration, error) {
return AddEventHandlers(
informer,
func(obj interface{}) { a(obj.(T)) },
func(old, obj interface{}) { u(old.(T), obj.(T)) },
func(obj interface{}) { d(obj.(T)) },
)
var onAdd addFunc
var onUpdate updateFunc
var onDelete deleteFunc
if a != nil {
onAdd = func(obj interface{}) { a(obj.(T)) }
}
if u != nil {
onUpdate = func(old, obj interface{}) { u(old.(T), obj.(T)) }
}
if d != nil {
onDelete = func(obj interface{}) { d(obj.(T)) }
}
return AddEventHandlers(informer, onAdd, onUpdate, onDelete)
}
func AddKeyedEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey keyFunc) (EnqueueFunc, cache.ResourceEventHandlerRegistration, error) {