1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00
kyverno/pkg/event/controller.go
Khaled Emara 88798c3e39
feat: add new client for events (#9323)
Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>
2024-01-03 01:12:05 +00:00

132 lines
4.1 KiB
Go

package event
import (
"context"
"sync"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
)
const (
Workers = 3
CleanupWorkers = 3
eventWorkQueueName = "kyverno-events"
workQueueRetryLimit = 3
)
// generator generate events
type generator struct {
// broadcaster
broadcaster events.EventBroadcaster
// recorders
recorders map[Source]events.EventRecorder
// client
eventsClient eventsv1.EventsV1Interface
// config
omitEvents sets.Set[string]
logger logr.Logger
}
// Interface to generate event
type Interface interface {
Add(infoList ...Info)
}
// Controller interface to generate event
type Controller interface {
Interface
Run(context.Context, int, *sync.WaitGroup)
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(eventsClient eventsv1.EventsV1Interface, logger logr.Logger, omitEvents ...string) Controller {
return &generator{
broadcaster: events.NewBroadcaster(&events.EventSinkImpl{
Interface: eventsClient,
}),
eventsClient: eventsClient,
omitEvents: sets.New(omitEvents...),
logger: logger,
}
}
// Add queues an event for generation
func (gen *generator) Add(infos ...Info) {
logger := gen.logger
logger.V(3).Info("generating events", "count", len(infos))
for _, info := range infos {
// don't create event for resources with generateName as the name is not generated yet
if info.Regarding.Name == "" {
logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace)
continue
}
if gen.omitEvents.Has(string(info.Reason)) {
logger.V(6).Info("omitting event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
continue
}
gen.emitEvent(info)
logger.V(6).Info("creating event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
}
}
// Run begins generator
func (gen *generator) Run(ctx context.Context, workers int, waitGroup *sync.WaitGroup) {
logger := gen.logger
logger.Info("start")
defer logger.Info("terminated")
defer utilruntime.HandleCrash()
defer gen.stopRecorders()
defer logger.Info("shutting down...")
if err := gen.startRecorders(ctx); err != nil {
logger.Error(err, "failed to start recorders")
return
}
<-ctx.Done()
}
func (gen *generator) startRecorders(ctx context.Context) error {
if err := gen.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
return err
}
logger := klog.Background().V(int(0))
// TODO: logger watcher should be stopped
if _, err := gen.broadcaster.StartLogging(logger); err != nil {
return err
}
gen.recorders = map[Source]events.EventRecorder{
PolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(PolicyController)),
AdmissionController: gen.broadcaster.NewRecorder(scheme.Scheme, string(AdmissionController)),
GeneratePolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(GeneratePolicyController)),
MutateExistingController: gen.broadcaster.NewRecorder(scheme.Scheme, string(MutateExistingController)),
CleanupController: gen.broadcaster.NewRecorder(scheme.Scheme, string(CleanupController)),
}
return nil
}
func (gen *generator) stopRecorders() {
gen.broadcaster.Shutdown()
}
func (gen *generator) emitEvent(key Info) {
logger := gen.logger
eventType := corev1.EventTypeWarning
if key.Reason == PolicyApplied || key.Reason == PolicySkipped {
eventType = corev1.EventTypeNormal
}
if recorder := gen.recorders[key.Source]; recorder != nil {
logger.V(3).Info("creating the event", "source", key.Source, "type", eventType, "resource", key.Resource())
recorder.Eventf(&key.Regarding, key.Related, eventType, string(key.Reason), string(key.Action), key.Message)
} else {
logger.Info("info.source not defined for the request")
}
}