1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-15 17:51:20 +00:00
kyverno/pkg/event/controller.go
Charles-Edouard Brétéché 39d5ceb00c
refactor: event package (#6124)
* refactor: event package

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* more

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* kuttl tests

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* Update pkg/event/source.go

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
2023-01-26 21:19:02 +00:00

215 lines
6.3 KiB
Go

package event
import (
"context"
"time"
"github.com/go-logr/logr"
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
"github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/controllers"
corev1 "k8s.io/api/core/v1"
errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
const (
eventWorkQueueName = "kyverno-events"
workQueueRetryLimit = 3
)
// generator generate events
type generator struct {
client dclient.Interface
// list/get cluster policy
cpLister kyvernov1listers.ClusterPolicyLister
// list/get policy
pLister kyvernov1listers.PolicyLister
// queue to store event generation requests
queue workqueue.RateLimitingInterface
// events generated at policy controller
policyCtrRecorder record.EventRecorder
// events generated at admission control
admissionCtrRecorder record.EventRecorder
// events generated at namespaced policy controller to process 'generate' rule
genPolicyRecorder record.EventRecorder
// events generated at mutateExisting controller
mutateExistingRecorder record.EventRecorder
maxQueuedEvents int
log logr.Logger
}
// Controller interface to generate event
type Controller interface {
controllers.Controller
Interface
}
// Interface to generate event
type Interface interface {
Add(infoList ...Info)
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(
// source Source,
client dclient.Interface,
cpInformer kyvernov1informers.ClusterPolicyInformer,
pInformer kyvernov1informers.PolicyInformer,
maxQueuedEvents int,
log logr.Logger,
) Controller {
gen := generator{
client: client,
cpLister: cpInformer.Lister(),
pLister: pInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), eventWorkQueueName),
policyCtrRecorder: NewRecorder(PolicyController, client.GetEventsInterface()),
admissionCtrRecorder: NewRecorder(AdmissionController, client.GetEventsInterface()),
genPolicyRecorder: NewRecorder(GeneratePolicyController, client.GetEventsInterface()),
mutateExistingRecorder: NewRecorder(MutateExistingController, client.GetEventsInterface()),
maxQueuedEvents: maxQueuedEvents,
log: log,
}
return &gen
}
// Add queues an event for generation
func (gen *generator) Add(infos ...Info) {
logger := gen.log
logger.V(3).Info("generating events", "count", len(infos))
if gen.maxQueuedEvents == 0 || gen.queue.Len() > gen.maxQueuedEvents {
logger.V(2).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len())
return
}
for _, info := range infos {
if info.Name == "" {
// dont create event for resources with generateName
// as the name is not generated yet
logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace)
continue
}
gen.queue.Add(info)
}
}
// Run begins generator
func (gen *generator) Run(ctx context.Context, workers int) {
logger := gen.log
defer utilruntime.HandleCrash()
logger.Info("start")
defer logger.Info("shutting down")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, gen.runWorker, time.Second)
}
<-ctx.Done()
}
func (gen *generator) runWorker(ctx context.Context) {
for gen.processNextWorkItem() {
}
}
func (gen *generator) handleErr(err error, key interface{}) {
logger := gen.log
if err == nil {
gen.queue.Forget(key)
return
}
// This controller retries if something goes wrong. After that, it stops trying.
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
logger.V(4).Info("retrying event generation", "key", key, "reason", err.Error())
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
gen.queue.AddRateLimited(key)
return
}
gen.queue.Forget(key)
if !errors.IsNotFound(err) {
logger.Error(err, "failed to generate event", "key", key)
}
}
func (gen *generator) processNextWorkItem() bool {
obj, shutdown := gen.queue.Get()
if shutdown {
return false
}
defer gen.queue.Done(obj)
var key Info
var ok bool
if key, ok = obj.(Info); !ok {
gen.queue.Forget(obj)
gen.log.V(2).Info("Incorrect type; expected type 'info'", "obj", obj)
return true
}
err := gen.syncHandler(key)
gen.handleErr(err, obj)
return true
}
func (gen *generator) syncHandler(key Info) error {
logger := gen.log
var robj runtime.Object
var err error
switch key.Kind {
case "ClusterPolicy":
robj, err = gen.cpLister.Get(key.Name)
if err != nil {
logger.Error(err, "failed to get cluster policy", "name", key.Name)
return err
}
case "Policy":
robj, err = gen.pLister.Policies(key.Namespace).Get(key.Name)
if err != nil {
logger.Error(err, "failed to get policy", "name", key.Name)
return err
}
default:
robj, err = gen.client.GetResource(context.TODO(), "", key.Kind, key.Namespace, key.Name)
if err != nil {
if !errors.IsNotFound(err) {
logger.Error(err, "failed to get resource", "kind", key.Kind, "name", key.Name, "namespace", key.Namespace)
return nil
}
return err
}
}
// set the event type based on reason
// if skip/pass, reason will be: NORMAL
// else reason will be: WARNING
eventType := corev1.EventTypeWarning
if key.Reason == PolicyApplied || key.Reason == PolicySkipped {
eventType = corev1.EventTypeNormal
}
// based on the source of event generation, use different event recorders
switch key.Source {
case AdmissionController:
gen.admissionCtrRecorder.Event(robj, eventType, string(key.Reason), key.Message)
case PolicyController:
gen.policyCtrRecorder.Event(robj, eventType, string(key.Reason), key.Message)
case GeneratePolicyController:
gen.genPolicyRecorder.Event(robj, eventType, string(key.Reason), key.Message)
case MutateExistingController:
gen.mutateExistingRecorder.Event(robj, eventType, string(key.Reason), key.Message)
default:
logger.Info("info.source not defined for the request")
}
return nil
}